Skip to content

Commit

Permalink
Fix the issues of XDS flow control
Browse files Browse the repository at this point in the history
Signed-off-by: hanbingleixue <[email protected]>
  • Loading branch information
hanbingleixue committed Feb 6, 2025
1 parent 91e61f3 commit 7a09e77
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -139,16 +140,21 @@ private static XdsInstanceCircuitBreakers parseInstanceCircuitBreakers(Cluster c
xdsInstanceCircuitBreakers.setConsecutiveGatewayFailure(outlierDetection.getConsecutiveGatewayFailure()
.getValue());
xdsInstanceCircuitBreakers.setConsecutive5xxFailure(outlierDetection.getConsecutive5Xx().getValue());
long interval = java.time.Duration.ofSeconds(outlierDetection.getInterval().getSeconds()).toMillis();
xdsInstanceCircuitBreakers.setInterval(interval);
long ejectionTime = java.time.Duration.ofSeconds(outlierDetection.getBaseEjectionTime().getSeconds())
.toMillis();
xdsInstanceCircuitBreakers.setBaseEjectionTime(ejectionTime);
xdsInstanceCircuitBreakers.setInterval(getDurationInMillis(outlierDetection.getInterval()));
xdsInstanceCircuitBreakers.setBaseEjectionTime(getDurationInMillis(outlierDetection.getBaseEjectionTime()));
xdsInstanceCircuitBreakers.setMaxEjectionPercent(outlierDetection.getMaxEjectionPercent().getValue());
xdsInstanceCircuitBreakers.setFailurePercentageMinimumHosts(outlierDetection.getFailurePercentageMinimumHosts()
.getValue());
xdsInstanceCircuitBreakers.setMinHealthPercent(cluster.getCommonLbConfig().
getHealthyPanicThreshold().getValue());
return xdsInstanceCircuitBreakers;
}

private static long getDurationInMillis(com.google.protobuf.Duration duration) {
long interval = duration.getSeconds();
if (interval != 0) {
return Duration.ofSeconds(interval).toMillis();
}
return Duration.ofNanos(duration.getNanos()).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,7 @@ private static XdsRetryPolicy parseRetryPolicy(RetryPolicy retryPolicy) {
xdsRetryPolicy.setRetryConditions(Arrays.asList(retryPolicy.getRetryOn().split(CommonConstant.COMMA)));
}
xdsRetryPolicy.setMaxAttempts(retryPolicy.getNumRetries().getValue());
long perTryTimeout = Duration.ofSeconds(retryPolicy.getPerTryTimeout().getSeconds()).toMillis();
xdsRetryPolicy.setPerTryTimeout(perTryTimeout);
xdsRetryPolicy.setPerTryTimeout(getDurationInMillis(retryPolicy.getPerTryTimeout()));
return xdsRetryPolicy;
}

Expand Down Expand Up @@ -307,8 +306,7 @@ private static XdsAbort parseAbort(FaultAbort faultAbort) {

private static XdsDelay parseDelay(FaultDelay faultDelay) {
XdsDelay xdsDelay = new XdsDelay();
long fixedDelay = Duration.ofSeconds(faultDelay.getFixedDelay().getSeconds()).toMillis();
xdsDelay.setFixedDelay(fixedDelay);
xdsDelay.setFixedDelay(getDurationInMillis(faultDelay.getFixedDelay()));
io.sermant.core.service.xds.entity.FractionalPercent fractionalPercent =
new io.sermant.core.service.xds.entity.FractionalPercent();
fractionalPercent.setNumerator(faultDelay.getPercentage().getNumerator());
Expand Down Expand Up @@ -448,4 +446,12 @@ private static Optional<XdsTokenBucket> parseTokenBucket(Struct tokenBucketStruc
xdsTokenBucket.setTokensPerFill((int) tokensPerFill);
return Optional.of(xdsTokenBucket);
}

private static long getDurationInMillis(com.google.protobuf.Duration duration) {
long interval = duration.getSeconds();
if (interval != 0) {
return Duration.ofSeconds(interval).toMillis();
}
return Duration.ofNanos(duration.getNanos()).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ public class CommonConst {
*/
public static final String DEFAULT_CONTENT_TYPE = "text/plain";

/**
* Minimum response code for a successful request
*/
public static final int MIN_SUCCESS_STATUS_CODE = 200;

/**
* Maximum response code for a successful request
*/
public static final int MAX_SUCCESS_STATUS_CODE = 399;

private CommonConst() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import io.sermant.core.plugin.config.PluginConfigManager;
import io.sermant.core.service.xds.entity.XdsRetryPolicy;
import io.sermant.core.utils.CollectionUtils;
import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.config.CommonConst;
import io.sermant.flowcontrol.common.config.FlowControlConfig;
import io.sermant.flowcontrol.common.support.ReflectMethodCacheSupport;
import io.sermant.flowcontrol.common.xds.retry.ExceptionRetryConditionType;
import io.sermant.flowcontrol.common.xds.retry.ResultRetryConditionType;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;
import io.sermant.flowcontrol.common.xds.retry.RetryConditionType;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -89,15 +92,16 @@ public boolean isNeedRetry(Object result, XdsRetryPolicy retryPolicy) {
return false;
}
String statusCode = statusCodeOptional.get();
if (conditions.contains(statusCode)) {
return true;
if (isSuccess(statusCode)) {
return false;
}
for (String conditionName : conditions) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = ResultRetryConditionType
.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, null, statusCode, result)) {
if (retryConditionOptional.get().isNeedRetry(this, null, statusCode, result)) {
return true;
}
}
Expand All @@ -110,17 +114,32 @@ public boolean isNeedRetry(Throwable ex, XdsRetryPolicy retryPolicy) {
return false;
}
for (String conditionName : retryPolicy.getRetryConditions()) {
Optional<RetryCondition> retryConditionOptional = RetryConditionType.getRetryConditionByName(conditionName);
Optional<RetryCondition> retryConditionOptional = ExceptionRetryConditionType
.getRetryConditionByName(conditionName);
if (!retryConditionOptional.isPresent()) {
continue;
}
if (retryConditionOptional.get().needRetry(this, ex, null, null)) {
if (retryConditionOptional.get().isNeedRetry(this, ex, null, null)) {
return true;
}
}
return false;
}

/**
* Determine if the request is successful
*
* @param statusCode status code
* @return if the request is successful,true : success false: failure
*/
public static boolean isSuccess(String statusCode) {
if (StringUtils.isEmpty(statusCode)) {
return false;
}
int code = Integer.parseInt(statusCode);
return code >= CommonConst.MIN_SUCCESS_STATUS_CODE && code <= CommonConst.MAX_SUCCESS_STATUS_CODE;
}

/**
* implemented by subclasses, if subclass implement {@link #isNeedRetry(Set, Object)}, no need to implement the get
* code method
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.flowcontrol.common.xds.retry;

import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.condition.ConnectErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerExceptionCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificHeaderNameErrorRetryCondition;

import java.util.Optional;

/**
* Retry Condition Manager
*
* @author zhp
* @since 2024-11-29
*/
public enum ExceptionRetryConditionType {
/**
* The type of conditional judgment for server errors
*/
SERVER_ERROR("5xx", new ServerExceptionCondition()),

/**
* The type of conditional judgment for reset errors
*/
RESET_ERROR("reset", new ResetErrorCondition()),

/**
* The type of conditional judgment for resetting errors before request
*/
RESET_BEFORE_REQUEST_ERROR("reset-before-request", new ResetBeforeRequestErrorCondition()),

/**
* The type of conditional judgment for connect errors
*/
CONNECT_ERROR("connect-failure", new ConnectErrorRetryCondition()),

/**
* The type of conditional judgment for Specify response headers
*/
SPECIFIC_HEADER_NAME_ERROR("retriable-headers", new SpecificHeaderNameErrorRetryCondition());

/**
* the name of retry condition
*/
private final String conditionName;

/**
* the instance of implements class for retry condition
*/
private final RetryCondition retryCondition;

ExceptionRetryConditionType(String conditionName, RetryCondition retryCondition) {
this.conditionName = conditionName;
this.retryCondition = retryCondition;
}

public String getConditionName() {
return conditionName;
}

public RetryCondition getRetryCondition() {
return retryCondition;
}

/**
* get the instance of implements class by condition name
*
* @param conditionName condition name
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionByName(String conditionName) {
for (ExceptionRetryConditionType retryConditionType : ExceptionRetryConditionType.values()) {
if (StringUtils.equals(retryConditionType.getConditionName(), conditionName)) {
return Optional.of(retryConditionType.getRetryCondition());
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@

import io.sermant.core.utils.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.condition.ClientErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ConnectErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.GatewayErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetBeforeRequestErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ResetErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.ServerErrorCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificHeaderNameErrorRetryCondition;
import io.sermant.flowcontrol.common.xds.retry.condition.SpecificStatusCodeErrorRetryCondition;
Expand All @@ -34,7 +31,7 @@
* @author zhp
* @since 2024-11-29
*/
public enum RetryConditionType {
public enum ResultRetryConditionType {
/**
* The type of conditional judgment for server errors
*/
Expand All @@ -50,21 +47,6 @@ public enum RetryConditionType {
*/
GATEWAY_ERROR("gateway-error", new GatewayErrorCondition()),

/**
* The type of conditional judgment for reset errors
*/
RESET_ERROR("reset", new ResetErrorCondition()),

/**
* The type of conditional judgment for resetting errors before request
*/
RESET_BEFORE_REQUEST_ERROR("reset-before-request", new ResetBeforeRequestErrorCondition()),

/**
* The type of conditional judgment for connect errors
*/
CONNECT_ERROR("connect-failure", new ConnectErrorRetryCondition()),

/**
* The type of conditional judgment for Specify response code
*/
Expand All @@ -85,7 +67,7 @@ public enum RetryConditionType {
*/
private final RetryCondition retryCondition;

RetryConditionType(String conditionName, RetryCondition retryCondition) {
ResultRetryConditionType(String conditionName, RetryCondition retryCondition) {
this.conditionName = conditionName;
this.retryCondition = retryCondition;
}
Expand All @@ -105,7 +87,7 @@ public RetryCondition getRetryCondition() {
* @return instance of implements class for retry condition
*/
public static Optional<RetryCondition> getRetryConditionByName(String conditionName) {
for (RetryConditionType retryConditionType : RetryConditionType.values()) {
for (ResultRetryConditionType retryConditionType : ResultRetryConditionType.values()) {
if (StringUtils.equals(retryConditionType.getConditionName(), conditionName)) {
return Optional.of(retryConditionType.getRetryCondition());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ public interface RetryCondition {
* @param statusCode response status code
* @return The result of the decision
*/
boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result);
boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ClientErrorCondition implements RetryCondition {
private static final int MAX_4XX_FAILURE = 499;

@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
if (StringUtils.isEmpty(statusCode)) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.sermant.flowcontrol.common.util.StringUtils;
import io.sermant.flowcontrol.common.xds.retry.RetryCondition;

import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
Expand All @@ -36,7 +35,7 @@
*/
public class ConnectErrorRetryCondition implements RetryCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
if (ex == null) {
return false;
}
Expand All @@ -51,10 +50,15 @@ public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object re
}

private boolean isConnectErrorException(Throwable ex) {
if (ex instanceof InterruptedIOException && StringUtils.contains(ex.getMessage(), "timeout")) {
return true;
if (isRequestTimeoutException(ex)) {
return false;
}
return ex instanceof SocketTimeoutException || ex instanceof ConnectException || ex instanceof TimeoutException
|| ex instanceof NoRouteToHostException;
}

private boolean isRequestTimeoutException(Throwable ex) {
return (ex instanceof SocketTimeoutException || ex instanceof TimeoutException)
&& !StringUtils.isEmpty(ex.getMessage()) && ex.getMessage().contains("Read timed out");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class GatewayErrorCondition implements RetryCondition {
private static final Set<String> GATE_WAY_FAILURE_CODE = new HashSet<>(Arrays.asList("502", "503", "504"));

@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return !StringUtils.isEmpty(statusCode) && GATE_WAY_FAILURE_CODE.contains(statusCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public class ResetBeforeRequestErrorCondition extends ResetErrorCondition {
@Override
public boolean needRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return XdsThreadLocalUtil.getSendByteFlag() && super.needRetry(retry, ex, statusCode, result);
public boolean isNeedRetry(Retry retry, Throwable ex, String statusCode, Object result) {
return XdsThreadLocalUtil.getSendByteFlag() && super.isNeedRetry(retry, ex, statusCode, result);
}
}
Loading

0 comments on commit 7a09e77

Please sign in to comment.