diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java new file mode 100644 index 0000000000..34a00a71c9 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsDeclarer.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.config.PluginConfigManager; +import io.sermant.flowcontrol.common.config.XdsFlowControlConfig; + +/** + * okhttp request declarer + * + * @author zhp + * @since 2024-11-30 + */ +public abstract class AbstractXdsDeclarer extends AbstractPluginDeclarer { + private final XdsFlowControlConfig config = PluginConfigManager.getPluginConfig(XdsFlowControlConfig.class); + + @Override + public boolean isEnabled() { + return config.isEnable(); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java new file mode 100644 index 0000000000..d05e2f345f --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/AbstractXdsHttpClientInterceptor.java @@ -0,0 +1,350 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol; + +import io.github.resilience4j.retry.Retry; +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.service.xds.entity.XdsInstanceCircuitBreakers; +import io.sermant.core.service.xds.entity.XdsRequestCircuitBreakers; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.exception.InvokerWrapperException; +import io.sermant.flowcontrol.common.handler.retry.RetryContext; +import io.sermant.flowcontrol.common.handler.retry.policy.RetryPolicy; +import io.sermant.flowcontrol.common.util.StringUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.common.xds.circuit.XdsCircuitBreakerManager; +import io.sermant.flowcontrol.common.xds.handler.XdsHandler; +import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancer; +import io.sermant.flowcontrol.common.xds.lb.XdsLoadBalancerFactory; +import io.sermant.flowcontrol.service.InterceptorSupporter; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import java.util.logging.Logger; + +/** + * Enhance the client request sending functionality by performing Xds service instance discovery and circuit breaking + * during the request sending process + * + * @author zhp + * @since 2024-11-30 + */ +public abstract class AbstractXdsHttpClientInterceptor extends InterceptorSupporter { + protected static final String MESSAGE = "CircuitBreaker has forced open and deny any requests"; + + protected static final Logger LOGGER = LoggerFactory.getLogger(); + + private static final int MIN_SUCCESS_CODE = 200; + + private static final int MAX_SUCCESS_CODE = 399; + + private static final int HUNDRED = 100; + + protected final io.sermant.flowcontrol.common.handler.retry.Retry retry; + + protected final String className; + + /** + * Constructor + * + * @param retry Retry instance, used for retry determination + * @param className The fully qualified naming of interceptors + */ + public AbstractXdsHttpClientInterceptor(io.sermant.flowcontrol.common.handler.retry.Retry retry, String className) { + this.retry = retry; + this.className = className; + } + + /** + * Perform circuit breaker judgment and handling + * + * @return The result of whether circuit breaking is needed + */ + public boolean isNeedCircuitBreak() { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo == null || StringUtils.isEmpty(scenarioInfo.getServiceName()) + || StringUtils.isEmpty(scenarioInfo.getClusterName()) + || StringUtils.isEmpty(scenarioInfo.getAddress())) { + return false; + } + Optional circuitBreakersOptional = XdsHandler.INSTANCE. + getRequestCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); + if (!circuitBreakersOptional.isPresent()) { + return false; + } + int activeRequestNum = XdsCircuitBreakerManager.incrementActiveRequests(scenarioInfo.getServiceName(), + scenarioInfo.getClusterName(), scenarioInfo.getAddress()); + int maxRequest = circuitBreakersOptional.get().getMaxRequests(); + return maxRequest > 0 && activeRequestNum > maxRequest; + } + + /** + * Execute method invocation and retry logic + * + * @param context The execution context of the Interceptor + */ + public void executeWithRetryPolicy(ExecuteContext context) { + Object result = context.getResult(); + Throwable ex = context.getThrowable(); + + // Create logical function for service invocation or retry + final Supplier retryFunc = createRetryFunc(context, result); + RetryContext.INSTANCE.markRetry(retry); + try { + // first execution taking over the host logic + result = retryFunc.get(); + } catch (Throwable throwable) { + ex = throwable; + log(throwable); + } + context.afterMethod(result, ex); + try { + final List handlers = getRetryHandlers(); + + // Determine whether retry is necessary + if (!handlers.isEmpty() && needRetry(handlers.get(0), result, ex)) { + // execute retry logic + result = handlers.get(0).executeCheckedSupplier(retryFunc::get); + } + context.skip(result); + } catch (Throwable throwable) { + LOGGER.warning(String.format(Locale.ENGLISH, + "Failed to invoke method:%s for few times, reason:%s", + context.getMethod().getName(), getExMsg(throwable))); + context.setThrowableOut(getRealCause(throwable)); + } finally { + RetryContext.INSTANCE.remove(); + } + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + XdsThreadLocalUtil.removeSendByteFlag(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (context.getThrowable() == null && scenarioInfo != null) { + decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo); + } + chooseHttpService().onAfter(className, context); + return context; + } + + @Override + public ExecuteContext doThrow(ExecuteContext context) { + XdsThreadLocalUtil.removeSendByteFlag(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo != null) { + decrementActiveRequestsAndCountFailureRequests(context, scenarioInfo); + } + chooseHttpService().onAfter(className, context); + return context; + } + + private void decrementActiveRequestsAndCountFailureRequests(ExecuteContext context, + FlowControlScenario scenarioInfo) { + XdsCircuitBreakerManager.decrementActiveRequests(scenarioInfo.getServiceName(), scenarioInfo.getServiceName(), + scenarioInfo.getAddress()); + int statusCode = getStatusCode(context); + if (statusCode >= MIN_SUCCESS_CODE && statusCode <= MAX_SUCCESS_CODE) { + return; + } + handleFailedRequests(scenarioInfo, statusCode); + } + + /** + * handle failure request + * + * @param statusCode response code + * @param scenario scenario information + */ + private void handleFailedRequests(FlowControlScenario scenario, int statusCode) { + XdsCircuitBreakerManager.decrementActiveRequests(scenario.getServiceName(), scenario.getClusterName(), + scenario.getAddress()); + Optional instanceCircuitBreakersOptional = XdsHandler.INSTANCE. + getInstanceCircuitBreakers(scenario.getServiceName(), scenario.getClusterName()); + if (!instanceCircuitBreakersOptional.isPresent()) { + return; + } + XdsInstanceCircuitBreakers circuitBreakers = instanceCircuitBreakersOptional.get(); + XdsCircuitBreakerManager.recordFailureRequest(scenario, scenario.getAddress(), statusCode, circuitBreakers); + XdsCircuitBreakerManager.setCircuitBeakerStatus(circuitBreakers, scenario); + } + + /** + * Get status code + * + * @param context The execution context of the Interceptor + * @return response code + */ + protected abstract int getStatusCode(ExecuteContext context); + + /** + * choose serviceInstance by xds rule + * + * @return result + */ + protected Optional chooseServiceInstanceForXds() { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + if (scenarioInfo == null || io.sermant.core.utils.StringUtils.isBlank(scenarioInfo.getServiceName()) + || io.sermant.core.utils.StringUtils.isEmpty(scenarioInfo.getClusterName())) { + return Optional.empty(); + } + Set serviceInstanceSet = XdsHandler.INSTANCE. + getMatchedServiceInstance(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); + if (serviceInstanceSet.isEmpty()) { + return Optional.empty(); + } + boolean needRetry = RetryContext.INSTANCE.isPolicyNeedRetry(); + if (needRetry) { + removeRetriedServiceInstance(serviceInstanceSet); + } + removeCircuitBreakerInstance(scenarioInfo, serviceInstanceSet); + return Optional.ofNullable(chooseServiceInstanceByLoadBalancer(serviceInstanceSet, scenarioInfo)); + } + + private void removeRetriedServiceInstance(Set serviceInstanceSet) { + RetryPolicy retryPolicy = RetryContext.INSTANCE.getRetryPolicy(); + retryPolicy.retryMark(); + Set retriedInstance = retryPolicy.getAllRetriedInstance(); + Set allInstance = new HashSet<>(serviceInstanceSet); + for (Object retryInstance : retriedInstance) { + if (retryInstance instanceof ServiceInstance) { + serviceInstanceSet.remove(retryInstance); + } + } + if (CollectionUtils.isEmpty(serviceInstanceSet)) { + serviceInstanceSet.addAll(allInstance); + } + } + + private ServiceInstance chooseServiceInstanceByLoadBalancer(Set instanceSet, + FlowControlScenario scenarioInfo) { + XdsLoadBalancer loadBalancer = XdsLoadBalancerFactory.getLoadBalancer(scenarioInfo.getServiceName(), + scenarioInfo.getClusterName()); + return loadBalancer.selectInstance(new ArrayList<>(instanceSet)); + } + + private void removeCircuitBreakerInstance(FlowControlScenario scenarioInfo, Set instanceSet) { + Optional instanceCircuitBreakersOptional = XdsHandler.INSTANCE. + getInstanceCircuitBreakers(scenarioInfo.getServiceName(), scenarioInfo.getClusterName()); + if (!instanceCircuitBreakersOptional.isPresent()) { + return; + } + XdsInstanceCircuitBreakers outlierDetection = instanceCircuitBreakersOptional.get(); + int count = instanceSet.size(); + if (checkMinInstanceNum(outlierDetection, count)) { + return; + } + List circuitBreakerInstances = new ArrayList<>(); + float maxCircuitBreakerPercent = (float) outlierDetection.getMaxEjectionPercent() / HUNDRED; + int maxCircuitBreakerInstances = (int) Math.floor(count * maxCircuitBreakerPercent); + for (ServiceInstance serviceInstance : instanceSet) { + if (hasReachedCircuitBreakerThreshold(circuitBreakerInstances, maxCircuitBreakerInstances)) { + break; + } + String address = serviceInstance.getHost() + CommonConst.CONNECT + serviceInstance.getPort(); + if (XdsCircuitBreakerManager.needsInstanceCircuitBreaker(scenarioInfo, address)) { + circuitBreakerInstances.add(serviceInstance); + } + } + if (checkHealthInstanceNum(count, outlierDetection, circuitBreakerInstances.size())) { + return; + } + circuitBreakerInstances.forEach(instanceSet::remove); + } + + private boolean hasReachedCircuitBreakerThreshold(List circuitBreakerInstances, + int maxCircuitBreakerInstances) { + return circuitBreakerInstances.size() >= maxCircuitBreakerInstances; + } + + private boolean checkHealthInstanceNum(int count, XdsInstanceCircuitBreakers outlierDetection, int size) { + return count * outlierDetection.getMinHealthPercent() >= (count - size); + } + + private boolean checkMinInstanceNum(XdsInstanceCircuitBreakers outlierDetection, int count) { + return outlierDetection.getFailurePercentageMinimumHosts() > count; + } + + /** + * Get Retry Handler + * + * @return Retry Handlers + */ + protected List getRetryHandlers() { + if (XdsThreadLocalUtil.getScenarioInfo() != null) { + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + RetryContext.INSTANCE.buildXdsRetryPolicy(scenarioInfo); + return getRetryHandler().getXdsHandlers(scenarioInfo); + } + return Collections.emptyList(); + } + + /** + * create retry method + * + * @param context The execution context of the Interceptor + * @param result The call result of the enhanced method + * @return Define Supplier for retry calls of service calls + * @throws InvokerWrapperException InvokerWrapperException + */ + protected Supplier createRetryFunc(ExecuteContext context, Object result) { + Object obj = context.getObject(); + Method method = context.getMethod(); + Object[] allArguments = context.getArguments(); + final AtomicBoolean isFirstInvoke = new AtomicBoolean(true); + return () -> { + method.setAccessible(true); + try { + preRetry(obj, method, allArguments, result, isFirstInvoke.get()); + Object invokeResult = method.invoke(obj, allArguments); + isFirstInvoke.compareAndSet(true, false); + return invokeResult; + } catch (IllegalAccessException ignored) { + isFirstInvoke.compareAndSet(true, false); + } catch (InvocationTargetException ex) { + isFirstInvoke.compareAndSet(true, false); + throw new InvokerWrapperException(ex.getTargetException()); + } + return result; + }; + } + + /** + * Pre-processing for retry calls + * + * @param obj enhancement class + * @param method target method + * @param allArguments method parameter + * @param result default result + * @param isFirstInvoke Is this the first invocation + */ + protected abstract void preRetry(Object obj, Method method, Object[] allArguments, Object result, + boolean isFirstInvoke); +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/ErrorCloseableHttpResponse.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/ErrorCloseableHttpResponse.java new file mode 100644 index 0000000000..2cbcef9481 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/inject/ErrorCloseableHttpResponse.java @@ -0,0 +1,199 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.inject; + +import org.apache.http.Header; +import org.apache.http.HeaderIterator; +import org.apache.http.HttpEntity; +import org.apache.http.HttpVersion; +import org.apache.http.ProtocolVersion; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; +import org.apache.http.message.BasicHeaderIterator; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpParams; + +import java.io.IOException; +import java.util.Locale; + +/** + * httpclient response + * + * @author zhouss + * @since 2024-12-20 + */ +public class ErrorCloseableHttpResponse implements CloseableHttpResponse { + private final int statusCode; + + private final String message; + + private final ProtocolVersion protocolVersion; + + /** + * Constructor + * + * @param statusCode Response code + * @param message error msg + * @param protocolVersion Request an agreement version + */ + public ErrorCloseableHttpResponse(int statusCode, String message, ProtocolVersion protocolVersion) { + this.statusCode = statusCode; + this.message = message; + this.protocolVersion = protocolVersion; + } + + @Override + public void close() throws IOException { + + } + + @Override + public StatusLine getStatusLine() { + return new BasicStatusLine(HttpVersion.HTTP_1_1, this.statusCode, this.message); + } + + @Override + public void setStatusLine(StatusLine statusline) { + + } + + @Override + public void setStatusLine(ProtocolVersion ver, int code) { + + } + + @Override + public void setStatusLine(ProtocolVersion ver, int code, String reason) { + + } + + @Override + public void setStatusCode(int code) throws IllegalStateException { + + } + + @Override + public void setReasonPhrase(String reason) throws IllegalStateException { + + } + + @Override + public HttpEntity getEntity() { + return new StringEntity(message == null ? "unKnow error" : message, ContentType.APPLICATION_JSON); + } + + @Override + public void setEntity(HttpEntity entity) { + } + + @Override + public Locale getLocale() { + return Locale.ENGLISH; + } + + @Override + public void setLocale(Locale loc) { + } + + @Override + public ProtocolVersion getProtocolVersion() { + return protocolVersion; + } + + @Override + public boolean containsHeader(String name) { + return false; + } + + @Override + public Header[] getHeaders(String name) { + return new Header[0]; + } + + @Override + public Header getFirstHeader(String name) { + return new BasicHeader("type", "SermantErrorResponse"); + } + + @Override + public Header getLastHeader(String name) { + return getFirstHeader(name); + } + + @Override + public Header[] getAllHeaders() { + return new Header[0]; + } + + @Override + public void addHeader(Header header) { + + } + + @Override + public void addHeader(String name, String value) { + + } + + @Override + public void setHeader(Header header) { + + } + + @Override + public void setHeader(String name, String value) { + + } + + @Override + public void setHeaders(Header[] headers) { + + } + + @Override + public void removeHeader(Header header) { + + } + + @Override + public void removeHeaders(String name) { + + } + + @Override + public HeaderIterator headerIterator() { + return this.headerIterator("errorHeaders"); + } + + @Override + public HeaderIterator headerIterator(String name) { + return new BasicHeaderIterator(new Header[0], name); + } + + @Override + public HttpParams getParams() { + return new BasicHttpParams(); + } + + @Override + public void setParams(HttpParams params) { + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java index 6d72cd5136..8b0c31bd62 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/FeignRequestInterceptor.java @@ -114,7 +114,7 @@ protected final ExecuteContext doBefore(ExecuteContext context) throws Exception .request(request) .build()); } else { - tryExeWithRetry(context); + executeWithRetryPolicy(context); } return context; } @@ -130,7 +130,7 @@ private Request getRequest(ExecuteContext context) { return newRequest; } - private void tryExeWithRetry(ExecuteContext context) { + private void executeWithRetryPolicy(ExecuteContext context) { final Object[] allArguments = context.getArguments(); Request request = (Request) allArguments[0]; Object result = context.getResult(); diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java new file mode 100644 index 0000000000..0f2f85b640 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpClientConnectionSendHeaderDeclarer.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * http client request declarer + * + * @author zhp + * @since 2024-11-30 + */ +public class HttpClientConnectionSendHeaderDeclarer extends AbstractXdsDeclarer { + /** + * the fully qualified name of the enhanced class + */ + private static final String[] ENHANCE_CLASS = {"org.apache.hc.core5.http.impl.io.DefaultBHttpClientConnection", + "org.apache.http.impl.DefaultBHttpClientConnection"}; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("sendRequestHeader"), + new HttpRequestSendHeaderInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java index 3744f541b9..1936630b17 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestInterceptor.java @@ -27,7 +27,6 @@ import io.sermant.flowcontrol.common.entity.RequestEntity.RequestType; import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; import io.sermant.flowcontrol.common.handler.retry.RetryContext; -import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; import io.sermant.flowcontrol.inject.DefaultClientHttpResponse; import io.sermant.flowcontrol.inject.RetryClientHttpResponse; import io.sermant.flowcontrol.service.InterceptorSupporter; @@ -99,12 +98,12 @@ protected final ExecuteContext doBefore(ExecuteContext context) { if (flowControlResult.isSkip()) { context.skip(new DefaultClientHttpResponse(flowControlResult)); } else { - tryExeWithRetry(context, httpRequestEntity.get()); + executeWithRetryPolicy(context, httpRequestEntity.get()); } return context; } - private void tryExeWithRetry(ExecuteContext context, HttpRequestEntity httpRequestEntity) { + private void executeWithRetryPolicy(ExecuteContext context, HttpRequestEntity httpRequestEntity) { final Object[] allArguments = context.getArguments(); final HttpRequest request = (HttpRequest) context.getObject(); Object result; @@ -143,7 +142,6 @@ private void tryExeWithRetry(ExecuteContext context, HttpRequestEntity httpReque @Override protected ExecuteContext doThrow(ExecuteContext context) { chooseHttpService().onThrow(className, context.getThrowable()); - XdsThreadLocalUtil.removeSendByteFlag(); return context; } @@ -156,7 +154,6 @@ protected final ExecuteContext doAfter(ExecuteContext context) throws IOExceptio chooseHttpService().onThrow(className, defaultException); } chooseHttpService().onAfter(className, context.getResult()); - XdsThreadLocalUtil.removeSendByteFlag(); return context; } diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java new file mode 100644 index 0000000000..b5b7395f8c --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/HttpRequestSendHeaderInterceptor.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.plugin.agent.interceptor.Interceptor; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +/** + * Enhance the request header sending method to include an indicator of whether the request byte stream has been sent + * to the server + * + * @author zhp + * @since 2024-11-30 + */ +public class HttpRequestSendHeaderInterceptor implements Interceptor { + @Override + public ExecuteContext before(ExecuteContext context) throws Exception { + return context; + } + + @Override + public ExecuteContext after(ExecuteContext context) throws Exception { + XdsThreadLocalUtil.setSendByteFlag(true); + return context; + } + + @Override + public ExecuteContext onThrow(ExecuteContext context) throws Exception { + return context; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java new file mode 100644 index 0000000000..d84df73e1c --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/OkHttpSendHeaderDeclarer.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * okhttp request declarer + * + * @author zhp + * @since 2024-11-30 + */ +public class OkHttpSendHeaderDeclarer extends AbstractXdsDeclarer { + /** + * the fully qualified name of the enhanced class + */ + private static final String[] ENHANCE_CLASS = {"com.squareup.okhttp.internal.http.Http1xStream", + "com.squareup.okhttp.internal.http.Http2xStream, okhttp3.internal.connection.Exchange"}; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASS); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("writeRequestHeaders"), + new HttpRequestSendHeaderInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java new file mode 100644 index 0000000000..f260e01b5e --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xDeclarer.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * For HTTP requests, obtain the instance list from the registry to intercept the

4.x version only

+ * + * @author zhp + * @since 2024-12-20 + */ +public class HttpClient4xDeclarer extends AbstractXdsDeclarer { + /** + * Fully qualified HTTP requests for enhanced classes + */ + private static final String[] ENHANCE_CLASSES = { + "org.apache.http.impl.client.AbstractHttpClient", + "org.apache.http.impl.client.DefaultRequestDirector", + "org.apache.http.impl.client.InternalHttpClient", + "org.apache.http.impl.client.MinimalHttpClient" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains("doExecute", "execute") + .and(MethodMatcher.paramTypesEqual( + "org.apache.http.HttpHost", + "org.apache.http.HttpRequest", + "org.apache.http.protocol.HttpContext")), + new HttpClient4xInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java new file mode 100644 index 0000000000..089cde0016 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpClient4xInterceptor.java @@ -0,0 +1,227 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import io.sermant.flowcontrol.inject.ErrorCloseableHttpResponse; + +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.util.EntityUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * HTTP interception only for version 4. x + * + * @author zhp + * @since 2024-12-20 + */ +public class HttpClient4xInterceptor extends AbstractXdsHttpClientInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + /** + * Constructor + */ + public HttpClient4xInterceptor() { + super(new HttpClientRetry(), HttpClient4xInterceptor.class.getName()); + } + + /** + * Pre-trigger point + * + * @param context Execution context + * @return Execution context + */ + @Override + public ExecuteContext doBefore(ExecuteContext context) { + Object[] arguments = context.getArguments(); + Object httpRequestObject = arguments[1]; + if (!(httpRequestObject instanceof HttpRequestBase)) { + return context; + } + final HttpRequestBase httpRequest = (HttpRequestBase) httpRequestObject; + + // Parse the service name, request path, and request header in the request information and convert them into + // a request entity class + final Optional httpRequestEntity = convertToValidHttpEntity(httpRequest); + if (!httpRequestEntity.isPresent()) { + return context; + } + final FlowControlResult flowControlResult = new FlowControlResult(); + + // Execute the flow control handler chain, with only fault for XDS + chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + + // When triggering some flow control rules, it is necessary to skip execution and return the result directly + if (flowControlResult.isSkip()) { + context.skip(new ErrorCloseableHttpResponse(flowControlResult.getResponse().getCode(), + flowControlResult.buildResponseMsg(), httpRequest.getProtocolVersion())); + return context; + } + + // Determine whether the number of requests has reached the threshold, and trigger flow control when the + // threshold is reached + if (isNeedCircuitBreak()) { + context.skip(new ErrorCloseableHttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR, MESSAGE, + httpRequest.getProtocolVersion())); + } + + // Execute service invocation and retry logic + executeWithRetryPolicy(context); + return context; + } + + private Optional convertToValidHttpEntity(HttpRequestBase httpRequest) { + if (httpRequest == null || httpRequest.getURI() == null) { + return Optional.empty(); + } + URI uri = httpRequest.getURI(); + String host = uri.getHost(); + String serviceName = host.split(CommonConst.ESCAPED_POINT)[0]; + if (!XdsRouterUtils.isXdsRouteRequired(serviceName)) { + return Optional.empty(); + } + final Map headers = getHeaders(httpRequest); + return Optional.of(new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT) + .setApiPath(uri.getPath()).setHeaders(headers) + .setMethod(httpRequest.getMethod()) + .setServiceName(serviceName) + .build()); + } + + private Map getHeaders(HttpRequest httpRequest) { + Map headerMap = new HashMap<>(); + for (Header header : httpRequest.getAllHeaders()) { + headerMap.putIfAbsent(header.getName(), header.getValue()); + } + return headerMap; + } + + @Override + public int getStatusCode(ExecuteContext context) { + Optional statusCodeOptional = retry.getCode(context.getResult()); + return statusCodeOptional.map(Integer::parseInt).orElse(CommonConst.DEFAULT_RESPONSE_CODE); + } + + @Override + protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { + tryClose(result); + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return; + } + ServiceInstance instance = serviceInstanceOptional.get(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + scenarioInfo.setAddress(instance.getHost() + CommonConst.CONNECT + instance.getPort()); + final HttpRequestBase httpRequest = (HttpRequestBase) allArguments[1]; + try { + httpRequest.setURI(new URI(XdsRouterUtils.rebuildUrlByXdsServiceInstance(httpRequest.getURI(), instance))); + allArguments[0] = new HttpHost(instance.getHost(), instance.getPort()); + } catch (URISyntaxException e) { + LOGGER.log(Level.WARNING, "Create uri using xds service instance failed.", e.getMessage()); + } + } + + private void tryClose(Object result) { + if (!(result instanceof HttpResponse)) { + return; + } + HttpResponse httpResponse = (HttpResponse) result; + try { + try { + EntityUtils.consume(httpResponse.getEntity()); + } finally { + if (httpResponse instanceof Closeable) { + ((Closeable) httpResponse).close(); + } + } + } catch (IOException ex) { + LOGGER.severe("An exception occurred when attempting to close the httpResponse."); + } + } + + /** + * Http Client retry + * + * @since 2022-02-21 + */ + public static class HttpClientRetry extends AbstractRetry { + @Override + public Optional getCode(Object result) { + if (!(result instanceof CloseableHttpResponse)) { + return Optional.empty(); + } + CloseableHttpResponse httpResponse = (CloseableHttpResponse) result; + if (httpResponse.getStatusLine() == null) { + return Optional.empty(); + } + return Optional.of(String.valueOf(httpResponse.getStatusLine().getStatusCode())); + } + + @Override + public Optional> getHeaderNames(Object result) { + if (!(result instanceof CloseableHttpResponse)) { + return Optional.empty(); + } + CloseableHttpResponse httpResponse = (CloseableHttpResponse) result; + Set headerNames = new HashSet<>(); + for (Header header : httpResponse.getAllHeaders()) { + headerNames.add(header.getName()); + } + return Optional.of(headerNames); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectDeclarer.java new file mode 100644 index 0000000000..f12f82cb96 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectDeclarer.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * Define the interception point information of java.net.HttpURLConnection of JDK 1.8
+ * + * @author zhp + * @since 2024-12-20 + */ +public class HttpUrlConnectionConnectDeclarer extends AbstractXdsDeclarer { + private static final String METHOD_NAME = "connect"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.isExtendedFrom("java.net.HttpURLConnection"); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new HttpUrlConnectionConnectInterceptor())}; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java new file mode 100644 index 0000000000..d70e17a5d0 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionConnectInterceptor.java @@ -0,0 +1,213 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.utils.CollectionUtils; +import io.sermant.core.utils.MapUtils; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.Proxy; +import java.net.URL; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An enhanced interceptor for java.net.HttpURLConnection in JDK version 1.8
+ * + * @author yuzl Yu Zhenlong + * @since 2024-12-20 + */ +public class HttpUrlConnectionConnectInterceptor extends AbstractXdsHttpClientInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + /** + * Constructor + */ + public HttpUrlConnectionConnectInterceptor() { + super(new HttpUrlConnectionResponseStreamInterceptor.HttpUrlConnectionRetry(), + HttpUrlConnectionConnectInterceptor.class.getName()); + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) { + if (!(context.getObject() instanceof HttpURLConnection)) { + return context; + } + HttpURLConnection connection = (HttpURLConnection) context.getObject(); + + // Parse the service name, request path, and request header in the request information and convert them into + // a request entity class + final Optional httpRequestEntity = convertToValidHttpEntity(connection); + if (!httpRequestEntity.isPresent()) { + return context; + } + final FlowControlResult flowControlResult = new FlowControlResult(); + + // Execute the flow control handler chain, with only fault for XDS + chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + + // When triggering some flow control rules, it is necessary to skip execution and return the result directly + if (flowControlResult.isSkip()) { + context.setThrowableOut(new RuntimeException(flowControlResult.buildResponseMsg())); + setResponseCodeAndMessage(connection, flowControlResult.getResponse().getCode(), + flowControlResult.buildResponseMsg()); + context.skip(null); + return context; + } + + // Determine whether the number of requests has reached the threshold, and trigger flow control when the + // threshold is reached + if (isNeedCircuitBreak()) { + context.setThrowableOut(new RuntimeException(MESSAGE)); + setResponseCodeAndMessage(connection, CommonConst.INTERVAL_SERVER_ERROR, MESSAGE); + context.skip(null); + return context; + } + + // Execute service invocation and retry logic + executeWithRetryPolicy(context); + return context; + } + + private static void setResponseCodeAndMessage(HttpURLConnection connection, int code, String message) { + ReflectUtils.setFieldValue(connection, "responseCode", code); + ReflectUtils.setFieldValue(connection, "responseMessage", message); + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + return context; + } + + @Override + public ExecuteContext doThrow(ExecuteContext context) { + return context; + } + + private Optional convertToValidHttpEntity(HttpURLConnection connection) { + if (connection == null || connection.getURL() == null) { + return Optional.empty(); + } + URL uri = connection.getURL(); + String host = uri.getHost(); + String serviceName = host.split(CommonConst.ESCAPED_POINT)[0]; + if (!XdsRouterUtils.isXdsRouteRequired(serviceName)) { + return Optional.empty(); + } + final Map headers = getHeaders(connection); + return Optional.of(new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT) + .setApiPath(uri.getPath()).setHeaders(headers) + .setMethod(connection.getRequestMethod()) + .setServiceName(serviceName) + .build()); + } + + private Map getHeaders(HttpURLConnection connection) { + Map headerMap = new HashMap<>(); + if (MapUtils.isEmpty(connection.getRequestProperties())) { + return headerMap; + } + for (Map.Entry> header : connection.getRequestProperties().entrySet()) { + if (CollectionUtils.isEmpty(header.getValue())) { + continue; + } + headerMap.putIfAbsent(header.getKey(), header.getValue().get(0)); + } + return headerMap; + } + + @Override + public int getStatusCode(ExecuteContext context) { + HttpURLConnection connection = (HttpURLConnection) context.getObject(); + try { + return connection.getResponseCode(); + } catch (IOException io) { + LOGGER.log(Level.SEVERE, "Failed to get response code.", io); + return CommonConst.DEFAULT_RESPONSE_CODE; + } + } + + @Override + protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return; + } + HttpURLConnection connection = (HttpURLConnection) obj; + ServiceInstance instance = serviceInstanceOptional.get(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + scenarioInfo.setAddress(instance.getHost() + CommonConst.CONNECT + instance.getPort()); + try { + URL url = connection.getURL(); + URL newUrl = new URL(url.getProtocol(), instance.getHost(), instance.getPort(), url.getFile()); + ReflectUtils.setFieldValue(connection, "url", newUrl); + tryResetProxy(newUrl, obj); + } catch (MalformedURLException e) { + LOGGER.log(Level.WARNING, "Create url using xds service instance failed.", e.getMessage()); + } + } + + /** + * In the scenario of specifying a proxy, you need to replace the address of the proxy with the actual downstream + * address; otherwise 404 will appear + * + * @param newUrl Actual downstream address + * @param object Enhanced object + */ + private void tryResetProxy(URL newUrl, Object object) { + final Optional instProxy = ReflectUtils.getFieldValue(object, "instProxy"); + if (!instProxy.isPresent() || !(instProxy.get() instanceof Proxy)) { + return; + } + final Proxy proxy = (Proxy) instProxy.get(); + if (proxy.type() != Proxy.Type.HTTP) { + return; + } + + // The user uses its own proxy to replace the resolved downstream address + ReflectUtils.setFieldValue(object, "instProxy", getProxy(newUrl)); + } + + private Proxy getProxy(URL newUrl) { + return createProxy(newUrl); + } + + private Proxy createProxy(URL newUrl) { + return new Proxy(Proxy.Type.HTTP, InetSocketAddress.createUnresolved(newUrl.getHost(), newUrl.getPort())); + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectDeclarer.java new file mode 100644 index 0000000000..c8bef3c6a7 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectDeclarer.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * Define the interception point information of java.net.HttpURLConnection of JDK 1.8
+ * + * @author zhp + * @since 2024-12-20 + */ +public class HttpUrlConnectionDisconnectDeclarer extends AbstractXdsDeclarer { + private static final String METHOD_NAME = "disconnect"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.isExtendedFrom("java.net.HttpURLConnection"); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new HttpUrlConnectionDisconnectInterceptor())}; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java new file mode 100644 index 0000000000..f8aa44d67b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionDisconnectInterceptor.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +import java.lang.reflect.Method; +import java.net.HttpURLConnection; + +/** + * This intercept point mainly cleans up the connection at the final stage + * + * @author zhouss + * @since 2024-12-20 + */ +public class HttpUrlConnectionDisconnectInterceptor extends AbstractXdsHttpClientInterceptor { + private static final String STATUS_CODE = "status_code"; + + /** + * Constructor + */ + public HttpUrlConnectionDisconnectInterceptor() { + super(null, HttpUrlConnectionDisconnectInterceptor.class.getCanonicalName()); + } + + @Override + public ExecuteContext doBefore(ExecuteContext context) throws Exception { + XdsThreadLocalUtil.removeSendByteFlag(); + HttpURLConnection httpUrlConnection = (HttpURLConnection) context.getObject(); + context.setLocalFieldValue(STATUS_CODE, httpUrlConnection.getResponseCode()); + return context; + } + + @Override + protected int getStatusCode(ExecuteContext context) { + Object statusCode = context.getLocalFieldValue(STATUS_CODE); + if (statusCode instanceof Integer) { + return (int) statusCode; + } + return CommonConst.DEFAULT_RESPONSE_CODE; + } + + @Override + protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamDeclarer.java new file mode 100644 index 0000000000..02bda8ea0d --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamDeclarer.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; +import io.sermant.flowcontrol.AbstractXdsDeclarer; + +/** + * Blocking HttpUrlConnection connections + * + * @author zhp + * @since 2024-12-20 + */ +public class HttpUrlConnectionResponseStreamDeclarer extends AbstractXdsDeclarer { + private static final String METHOD_NAME = "getInputStream"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.isExtendedFrom("java.net.HttpURLConnection"); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[] { + InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME), + new HttpUrlConnectionResponseStreamInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java new file mode 100644 index 0000000000..8df2054999 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/HttpUrlConnectionResponseStreamInterceptor.java @@ -0,0 +1,214 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.common.LoggerFactory; +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.utils.MapUtils; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import sun.net.www.http.HttpClient; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.Proxy; +import java.net.URL; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Intercept the HttpUrlConnection#getInputSteam method + * + * @author zhp + * @since 2024-12-20 + */ +public class HttpUrlConnectionResponseStreamInterceptor extends AbstractXdsHttpClientInterceptor { + private static final Logger LOGGER = LoggerFactory.getLogger(); + + /** + * Constructor + */ + public HttpUrlConnectionResponseStreamInterceptor() { + super(new HttpUrlConnectionRetry(), HttpUrlConnectionResponseStreamInterceptor.class.getCanonicalName()); + } + + @Override + protected ExecuteContext doBefore(ExecuteContext context) throws Exception { + if (!(context.getObject() instanceof HttpURLConnection)) { + return context; + } + if (XdsThreadLocalUtil.getScenarioInfo() == null) { + return context; + } + executeWithRetryPolicy(context); + return context; + } + + @Override + public ExecuteContext doAfter(ExecuteContext context) { + return context; + } + + @Override + public ExecuteContext doThrow(ExecuteContext context) { + return context; + } + + @Override + protected int getStatusCode(ExecuteContext context) { + HttpURLConnection connection = (HttpURLConnection) context.getObject(); + try { + return connection.getResponseCode(); + } catch (IOException io) { + LOGGER.log(Level.SEVERE, "Failed to get response code.", io); + return CommonConst.DEFAULT_RESPONSE_CODE; + } + } + + @Override + protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { + tryCloseOldInputStream(result); + if (isFirstInvoke) { + return; + } + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return; + } + HttpURLConnection connection = (HttpURLConnection) obj; + ServiceInstance instance = serviceInstanceOptional.get(); + resetStats(obj); + URL url = connection.getURL(); + URL newUrl = buildUrl(url, instance); + ReflectUtils.setFieldValue(connection, "url", newUrl); + resetHttpClient(connection, newUrl); + } + + private static URL buildUrl(URL url, ServiceInstance instance) { + try { + return new URL(url.getProtocol(), instance.getHost(), instance.getPort(), url.getFile()); + } catch (MalformedURLException e) { + LOGGER.log(Level.SEVERE, "Can not parse to URL for url.", e); + return url; + } + } + + private void resetStats(Object object) { + ReflectUtils.setFieldValue(object, "rememberedException", null); + ReflectUtils.setFieldValue(object, "failedOnce", false); + ReflectUtils.setFieldValue(object, "responseCode", CommonConst.DEFAULT_RESPONSE_CODE); + } + + private void resetHttpClient(Object target, URL url) { + final Optional http = ReflectUtils.getFieldValue(target, "http"); + if (http.isPresent() && http.get() instanceof HttpClient) { + // Close the original httpclient + ((HttpClient) http.get()).closeServer(); + } + final Optional connectTimeout = ReflectUtils.getFieldValue(target, "connectTimeout"); + final Optional readTimeout = ReflectUtils.getFieldValue(target, "readTimeout"); + if (!connectTimeout.isPresent() || !readTimeout.isPresent()) { + return; + } + final Optional instProxy = ReflectUtils.getFieldValue(target, "instProxy"); + try { + HttpClient newClient; + if (instProxy.isPresent() && instProxy.get() instanceof Proxy) { + newClient = HttpClient.New(url, (Proxy) instProxy.get(), (int) connectTimeout.get(), true, + (sun.net.www.protocol.http.HttpURLConnection) target); + } else { + newClient = HttpClient.New(url, null, (int) connectTimeout.get(), true, + (sun.net.www.protocol.http.HttpURLConnection) target); + } + newClient.setReadTimeout((int) readTimeout.get()); + ReflectUtils.setFieldValue(target, "http", newClient); + } catch (IOException e) { + LOGGER.info("Can not create httpclient when invoke!"); + } + } + + private void tryCloseOldInputStream(Object rawInputStream) { + if (rawInputStream instanceof Closeable) { + // Shut down processing for old input streams + try { + ((Closeable) rawInputStream).close(); + } catch (IOException e) { + LOGGER.warning("Close old input stream failed when invoke"); + } + } + } + + /** + * Http url connection retry + * + * @since 2022-02-21 + */ + public static class HttpUrlConnectionRetry extends AbstractRetry { + @Override + public Optional getCode(Object result) { + if (!(result instanceof HttpURLConnection)) { + return Optional.empty(); + } + HttpURLConnection connection = (HttpURLConnection) result; + try { + return Optional.of(String.valueOf(connection.getResponseCode())); + } catch (IOException io) { + LOGGER.log(Level.SEVERE, "Failed to get response code.", io); + return Optional.empty(); + } + } + + @Override + public Optional> getHeaderNames(Object result) { + if (!(result instanceof HttpURLConnection)) { + return Optional.empty(); + } + HttpURLConnection connection = (HttpURLConnection) result; + Set headerNames = new HashSet<>(); + if (MapUtils.isEmpty(connection.getHeaderFields())) { + return Optional.empty(); + } + Set headers = connection.getHeaderFields().keySet(); + for (Map.Entry> header : connection.getHeaderFields().entrySet()) { + headers.add(header.getKey()); + } + return Optional.of(headerNames); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientDeclarer.java new file mode 100644 index 0000000000..bcb0082679 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientDeclarer.java @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; + +/** + * For OKHTTP requests, obtain the instance list from the registry to block them + * + * @author zhp + * @since 2024-12-20 + */ +public class OkHttp3ClientDeclarer extends AbstractPluginDeclarer { + /** + * The fully qualified name of the enhanced okhttp request + */ + private static final String[] ENHANCE_CLASSES = { + "okhttp3.RealCall", + "okhttp3.internal.connection.RealCall" + }; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameContains(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameContains("execute", "getResponseWithInterceptorChain"), + new OkHttp3ClientInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java new file mode 100644 index 0000000000..4a9b61161b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttp3ClientInterceptor.java @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.core.utils.LogUtils; +import io.sermant.core.utils.ReflectUtils; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.exception.InvokerWrapperException; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; +import okhttp3.Headers; +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * Blocking for okHttp3.x and above versions + * + * @author zhp + * @since 2024-12-20 + */ +public class OkHttp3ClientInterceptor extends AbstractXdsHttpClientInterceptor { + private static final String REQUEST_FIELD_NAME = "originalRequest"; + + private final String className = HttpClient4xInterceptor.class.getName(); + + /** + * Constructor + */ + public OkHttp3ClientInterceptor() { + super(new OkHttp3Retry(), OkHttp3ClientInterceptor.class.getCanonicalName()); + } + + @Override + protected ExecuteContext doBefore(ExecuteContext context) throws Exception { + LogUtils.printHttpRequestBeforePoint(context); + final Optional rawRequest = getRequest(context.getObject()); + if (!rawRequest.isPresent()) { + return context; + } + context.setLocalFieldValue(REQUEST_FIELD_NAME, rawRequest.get()); + Request request = rawRequest.get(); + + // Parse the service name, request path, and request header in the request information and convert them into + // a request entity class + final Optional httpRequestEntity = convertToValidHttpEntity(request); + if (!httpRequestEntity.isPresent()) { + return context; + } + final FlowControlResult flowControlResult = new FlowControlResult(); + + // Execute the flow control handler chain, with only fault for XDS + chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + + // When triggering some flow control rules, it is necessary to skip execution and return the result directly + if (flowControlResult.isSkip()) { + Response.Builder builder = new Response.Builder(); + context.skip(builder.code(flowControlResult.getResponse().getCode()) + .message(flowControlResult.buildResponseMsg()).request(request).build()); + return context; + } + + // Determine whether the number of requests has reached the threshold, and trigger flow control when the + // threshold is reached + if (isNeedCircuitBreak()) { + Response.Builder builder = new Response.Builder(); + context.skip(builder.code(CommonConst.INTERVAL_SERVER_ERROR) + .message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1).build()); + } + + // Execute service invocation and retry logic + executeWithRetryPolicy(context); + return context; + } + + private Optional getRequest(Object object) { + final Optional originalRequest = ReflectUtils.getFieldValue(object, REQUEST_FIELD_NAME); + if (originalRequest.isPresent() && originalRequest.get() instanceof Request) { + return Optional.of((Request) originalRequest.get()); + } + return Optional.empty(); + } + + private Request rebuildRequest(Request request, ServiceInstance serviceInstance) { + HttpUrl url = request.url().newBuilder() + .host(serviceInstance.getHost()) + .port(serviceInstance.getPort()) + .build(); + return request.newBuilder() + .url(url) + .build(); + } + + private Optional convertToValidHttpEntity(Request httpRequest) { + HttpUrl uri = httpRequest.url(); + String serviceName = uri.host().split(CommonConst.ESCAPED_POINT)[0]; + if (!XdsRouterUtils.isXdsRouteRequired(serviceName)) { + return Optional.empty(); + } + final Map headers = getHeaders(httpRequest); + return Optional.of(new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT) + .setApiPath(uri.encodedPath()).setHeaders(headers) + .setMethod(httpRequest.method()) + .setServiceName(serviceName) + .build()); + } + + private Map getHeaders(Request httpRequest) { + Map headerMap = new HashMap<>(); + Headers headers = httpRequest.headers(); + for (String name : headers.names()) { + headerMap.putIfAbsent(name, headers.get(name)); + } + return headerMap; + } + + @Override + public int getStatusCode(ExecuteContext context) { + Optional statusCodeOptional = retry.getCode(context.getResult()); + return statusCodeOptional.map(Integer::parseInt).orElse(CommonConst.DEFAULT_RESPONSE_CODE); + } + + @Override + protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return; + } + final Optional rawRequest = getRequest(obj); + if (!rawRequest.isPresent()) { + return; + } + ServiceInstance instance = serviceInstanceOptional.get(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + scenarioInfo.setAddress(instance.getHost() + CommonConst.CONNECT + instance.getPort()); + ReflectUtils.setFieldValue(obj, REQUEST_FIELD_NAME, rebuildRequest(rawRequest.get(), instance)); + } + + private Object copyNewCall(Object object, Request newRequest) { + final Optional client = ReflectUtils.getFieldValue(object, "client"); + if (!client.isPresent()) { + return object; + } + final OkHttpClient okHttpClient = (OkHttpClient) client.get(); + return okHttpClient.newCall(newRequest); + } + + @Override + public Supplier createRetryFunc(ExecuteContext context, Object result) { + Object obj = context.getObject(); + Method method = context.getMethod(); + Object[] allArguments = context.getArguments(); + final AtomicBoolean isFirstInvoke = new AtomicBoolean(true); + return () -> { + method.setAccessible(true); + try { + Request request = (Request) context.getLocalFieldValue(REQUEST_FIELD_NAME); + preRetry(obj, method, allArguments, result, isFirstInvoke.get()); + Object newCall = copyNewCall(obj, request); + Object invokeResult = method.invoke(newCall, allArguments); + isFirstInvoke.compareAndSet(true, false); + return invokeResult; + } catch (IllegalAccessException ignored) { + // ignored + } catch (InvocationTargetException ex) { + throw new InvokerWrapperException(ex.getTargetException()); + } + return result; + }; + } + + /** + * OkHttp3 retry + * + * @since 2022-02-21 + */ + public static class OkHttp3Retry extends AbstractRetry { + @Override + public Optional getCode(Object result) { + if (!(result instanceof Response)) { + return Optional.empty(); + } + Response httpResponse = (Response) result; + return Optional.of(String.valueOf(httpResponse.code())); + } + + @Override + public Optional> getHeaderNames(Object result) { + if (!(result instanceof Response)) { + return Optional.empty(); + } + Response httpResponse = (Response) result; + return Optional.of(httpResponse.headers().names()); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainDeclarer.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainDeclarer.java new file mode 100644 index 0000000000..a616fd169b --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainDeclarer.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import io.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer; +import io.sermant.core.plugin.agent.declarer.InterceptDeclarer; +import io.sermant.core.plugin.agent.matcher.ClassMatcher; +import io.sermant.core.plugin.agent.matcher.MethodMatcher; + +/** + * For OKHTTP requests, modify the URL of request + * + * @author zhp + * @since 2024-12-20 + */ +public class OkHttpClientInterceptorChainDeclarer extends AbstractPluginDeclarer { + private static final String ENHANCE_CLASSES = + "com.squareup.okhttp.Call$ApplicationInterceptorChain"; + + @Override + public ClassMatcher getClassMatcher() { + return ClassMatcher.nameEquals(ENHANCE_CLASSES); + } + + @Override + public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) { + return new InterceptDeclarer[]{ + InterceptDeclarer.build(MethodMatcher.nameEquals("proceed"), + new OkHttpClientInterceptorChainInterceptor()) + }; + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java new file mode 100644 index 0000000000..b6543cf4c6 --- /dev/null +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/java/io/sermant/flowcontrol/retry/client/OkHttpClientInterceptorChainInterceptor.java @@ -0,0 +1,190 @@ +/* + * Copyright (C) 2024-2024 Sermant Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.sermant.flowcontrol.retry.client; + +import com.squareup.okhttp.Headers; +import com.squareup.okhttp.HttpUrl; +import com.squareup.okhttp.Protocol; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.Response; + +import io.sermant.core.plugin.agent.entity.ExecuteContext; +import io.sermant.core.service.xds.entity.ServiceInstance; +import io.sermant.flowcontrol.AbstractXdsHttpClientInterceptor; +import io.sermant.flowcontrol.common.config.CommonConst; +import io.sermant.flowcontrol.common.entity.FlowControlResult; +import io.sermant.flowcontrol.common.entity.FlowControlScenario; +import io.sermant.flowcontrol.common.entity.HttpRequestEntity; +import io.sermant.flowcontrol.common.entity.RequestEntity; +import io.sermant.flowcontrol.common.handler.retry.AbstractRetry; +import io.sermant.flowcontrol.common.util.XdsRouterUtils; +import io.sermant.flowcontrol.common.util.XdsThreadLocalUtil; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Level; + +/** + * Intercept for versions below okHttp3.1 + * + * @author zhp + * @since 2024-12-20 + */ +public class OkHttpClientInterceptorChainInterceptor extends AbstractXdsHttpClientInterceptor { + private final String className = HttpClient4xInterceptor.class.getName(); + + /** + * Constructor + */ + public OkHttpClientInterceptorChainInterceptor() { + super(new OkHttpRetry(), OkHttpClientInterceptorChainInterceptor.class.getCanonicalName()); + } + + @Override + protected ExecuteContext doBefore(ExecuteContext context) throws Exception { + Object[] arguments = context.getArguments(); + if (!(arguments[0] instanceof Request)) { + return context; + } + Request request = (Request) arguments[0]; + + // Parse the service name, request path, and request header in the request information and convert them into + // a request entity class + final Optional httpRequestEntity = convertToValidHttpEntity(request); + if (!httpRequestEntity.isPresent()) { + return context; + } + final FlowControlResult flowControlResult = new FlowControlResult(); + + // Execute the flow control handler chain, with only fault for XDS + chooseHttpService().onBefore(className, httpRequestEntity.get(), flowControlResult); + + // When triggering some flow control rules, it is necessary to skip execution and return the result directly + if (flowControlResult.isSkip()) { + Response.Builder builder = new Response.Builder(); + context.skip(builder.code(flowControlResult.getResponse().getCode()) + .message(flowControlResult.buildResponseMsg()).request(request) + .protocol(Protocol.HTTP_1_1).build()); + return context; + } + + // Determine whether the number of requests has reached the threshold, and trigger flow control when the + // threshold is reached + if (isNeedCircuitBreak()) { + Response.Builder builder = new Response.Builder(); + context.skip(builder.code(CommonConst.INTERVAL_SERVER_ERROR) + .message(MESSAGE).request(request).protocol(Protocol.HTTP_1_1).build()); + } + + // Execute service invocation and retry logic + executeWithRetryPolicy(context); + return context; + } + + private Optional convertToValidHttpEntity(Request httpRequest) { + HttpUrl uri = httpRequest.httpUrl(); + String serviceName = uri.host().split(CommonConst.ESCAPED_POINT)[0]; + if (!XdsRouterUtils.isXdsRouteRequired(serviceName)) { + return Optional.empty(); + } + final Map headers = getHeaders(httpRequest); + return Optional.of(new HttpRequestEntity.Builder() + .setRequestType(RequestEntity.RequestType.CLIENT) + .setApiPath(uri.encodedPath()).setHeaders(headers) + .setMethod(httpRequest.method()) + .setServiceName(serviceName) + .build()); + } + + private Map getHeaders(Request httpRequest) { + Map headerMap = new HashMap<>(); + Headers headers = httpRequest.headers(); + for (String name : headers.names()) { + headerMap.putIfAbsent(name, headers.get(name)); + } + return headerMap; + } + + @Override + public int getStatusCode(ExecuteContext context) { + Optional statusCodeOptional = retry.getCode(context.getResult()); + return statusCodeOptional.map(Integer::parseInt).orElse(CommonConst.DEFAULT_RESPONSE_CODE); + } + + @Override + protected void preRetry(Object obj, Method method, Object[] allArguments, Object result, boolean isFirstInvoke) { + Optional serviceInstanceOptional = chooseServiceInstanceForXds(); + if (!serviceInstanceOptional.isPresent()) { + return; + } + Request request = (Request) allArguments[0]; + ServiceInstance instance = serviceInstanceOptional.get(); + FlowControlScenario scenarioInfo = XdsThreadLocalUtil.getScenarioInfo(); + scenarioInfo.setAddress(instance.getHost() + CommonConst.CONNECT + instance.getPort()); + allArguments[0] = rebuildRequest(request, request.url(), instance); + } + + private Request rebuildRequest(Request request, URL url, ServiceInstance instance) { + try { + URL newUrl = new URL(url.getProtocol(), instance.getHost(), instance.getPort(), url.getFile()); + return request.newBuilder().url(newUrl).build(); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Convert url string to url failed.", e.getMessage()); + return request; + } + } + + /** + * OkHttp3 retry + * + * @since 2022-02-21 + */ + public static class OkHttpRetry extends AbstractRetry { + @Override + public Optional getCode(Object result) { + if (!(result instanceof Response)) { + return Optional.empty(); + } + Response httpResponse = (Response) result; + return Optional.of(String.valueOf(httpResponse.code())); + } + + @Override + public Optional> getHeaderNames(Object result) { + if (!(result instanceof Response)) { + return Optional.empty(); + } + Response httpResponse = (Response) result; + return Optional.of(httpResponse.headers().names()); + } + + @Override + public Class[] retryExceptions() { + return getRetryExceptions(); + } + + @Override + public RetryFramework retryType() { + return RetryFramework.SPRING; + } + } +} diff --git a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer index 49385b8e05..4e692c9553 100644 --- a/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer +++ b/sermant-plugins/sermant-flowcontrol/flowcontrol-plugin/src/main/resources/META-INF/services/io.sermant.core.plugin.agent.declarer.PluginDeclarer @@ -31,3 +31,12 @@ io.sermant.flowcontrol.retry.HttpRequestDeclarer io.sermant.flowcontrol.retry.SpringRibbonLbDeclarer io.sermant.flowcontrol.retry.SpringLbDeclarer io.sermant.flowcontrol.ClusterDeclarer +io.sermant.flowcontrol.retry.OkHttpSendHeaderDeclarer +io.sermant.flowcontrol.retry.HttpClientConnectionSendHeaderDeclarer +#HttpClient +io.sermant.flowcontrol.retry.client.OkHttpClientInterceptorChainDeclarer +io.sermant.flowcontrol.retry.client.HttpClient4xDeclarer +io.sermant.flowcontrol.retry.client.HttpUrlConnectionConnectDeclarer +io.sermant.flowcontrol.retry.client.HttpUrlConnectionDisconnectDeclarer +io.sermant.flowcontrol.retry.client.OkHttp3ClientDeclarer +io.sermant.flowcontrol.retry.client.HttpUrlConnectionResponseStreamDeclarer