Skip to content

Commit

Permalink
support dubbo3.x rateLimiting/circuitBreaker
Browse files Browse the repository at this point in the history
Signed-off-by: chengyouling <[email protected]>
  • Loading branch information
chengyouling committed Feb 27, 2025
1 parent f682ee5 commit d70612a
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 6 deletions.
15 changes: 15 additions & 0 deletions .github/actions/common/plugin-change-check/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ runs:
id: changed-dubbo-common-action
with:
paths: ./.github/actions/scenarios/dubbo/dubbo-common ./.github/actions/common/dubbo ./.github/workflows/dubbo_integration_test.yml
- uses: ktamas77/[email protected]
id: changed-dubbo3-flow-action
with:
paths: ./.github/actions/scenarios/dubbo/flow-dubbo3 ./.github/actions/common/dubbo ./.github/workflows/dubbo_integration_test.yml
- name: env dubbo3-flow-action
shell: bash
run: |
echo "enableDubbo3FlowActionChange=${{ steps.changed-dubbo3-flow-action.outputs.changed }}" >> $GITHUB_ENV
- name: env dubbo-router-action
shell: bash
run: |
Expand Down Expand Up @@ -678,6 +686,13 @@ runs:
echo "enableDubboLane=true" >> $GITHUB_ENV
echo "enableDubbo3Lane=true" >> $GITHUB_ENV
fi
# ==========dubbo3 flow is needed to test?==========
if [ ${{ env.sermantAgentCoreChanged }} == 'true' -o ${{ env.sermantFlowcontrolChanged }} == 'true' -o \
${{ env.enableDubbo3FlowActionChange }} == 'true' -o \
${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' -o \
${{ env.enableDubboTestChange }} == 'true' ];then
echo "enableDubbo3Flow=true" >> $GITHUB_ENV
fi
# *****************spring_integration_test_1.yml*****************
# ==========graceful is needed to test?==========
Expand Down
68 changes: 68 additions & 0 deletions .github/actions/scenarios/dubbo/dubbo3-flow/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
name: "Dubbo3 Flow Test"
description: "Auto test for dubbo3 flow"
runs:
using: "composite"
steps:
- name: entry
uses: ./.github/actions/common/entry
with:
log-dir: ./logs/dubbo3-flow
- name: package dubbo 3.0.x tests
shell: bash
if: matrix.dubbo-version == '3-0'
run: mvn package -Ddubbo.version=3.0.${{ matrix.dubbo-versions }} -DskipTests --file sermant-integration-tests/dubbo-test/pom.xml
- name: package dubbo 3.1.x tests
shell: bash
if: matrix.dubbo-version == '3-1'
run: mvn package -Ddubbo.version=3.1.${{ matrix.dubbo-versions }} -DskipTests --file sermant-integration-tests/dubbo-test/pom.xml
- name: package dubbo 3.2.x tests
shell: bash
if: matrix.dubbo-version == '3-2'
run: mvn package -Ddubbo.version=3.2.${{ matrix.dubbo-versions }} -DskipTests --file sermant-integration-tests/dubbo-test/pom.xml
- name: start provider service
shell: bash
env:
SERVER_PORT: 28022
DUBBO_PROTOCOL_PORT: 28822
run: |
nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=dubbo-integration-provider -jar \
sermant-integration-tests/dubbo-test/dubbo-${{ env.projectPath }}-integration-provider/target/dubbo-integration-provider.jar > ${{ env.logDir }}/dubbo-provider.log 2>&1 &
- name: waiting for providers start
shell: bash
run: |
bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:28022/actuator/health 120
- name: start consumer service
shell: bash
env:
SERVER_PORT: 28020
DUBBO_PROTOCOL_PORT: 28820
run: |
nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=dubbo-integration-consumer -jar \
sermant-integration-tests/dubbo-test/dubbo-${{ env.projectPath }}-integration-consumer/target/dubbo-integration-consumer.jar > ${{ env.logDir }}/dubbo-consumer.log 2>&1 &
- name: waiting for consumers start
shell: bash
run: |
bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:28020/actuator/health 120
- name: integration test
shell: bash
env:
TEST_TYPE: dubbo3-flow
run: mvn test --file sermant-integration-tests/dubbo-test/pom.xml
- name: mvn clean
shell: bash
run: mvn clean --file sermant-integration-tests/dubbo-test/pom.xml
- name: exit
if: always()
uses: ./.github/actions/common/exit
with:
processor-keyword: dubbo
- name: if failure then upload error log
uses: actions/upload-artifact@v4
if: ${{ failure() || cancelled() }}
with:
name: (test-for-dubbo3-flow)-(${{ matrix.dubbo-version }}-${{ matrix.dubbo-versions }})-logs
path: |
./*.log
./logs/**/*.log
if-no-files-found: warn
retention-days: 2
7 changes: 6 additions & 1 deletion .github/workflows/dubbo_integration_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
echo "enableDubboRemoval=${{env.enableDubboRemoval}}" >> $GITHUB_OUTPUT
echo "enableDubbo3Router=${{env.enableDubbo3Router}}" >> $GITHUB_OUTPUT
echo "enableDubbo3Lane=${{env.enableDubbo3Lane}}" >> $GITHUB_OUTPUT
echo "enableDubbo3Flow=${{env.enableDubbo3Flow}}" >> $GITHUB_OUTPUT
outputs:
enableDubboRouter: ${{ steps.set-outputs.outputs.enableDubboRouter }}
enableDubboMonitor: ${{ steps.set-outputs.outputs.enableDubboMonitor }}
Expand All @@ -57,6 +58,7 @@ jobs:
enableDubboRemoval: ${{ steps.set-outputs.outputs.enableDubboRemoval }}
enableDubbo3Router: ${{ steps.set-outputs.outputs.enableDubbo3Router }}
enableDubbo3Lane: ${{ steps.set-outputs.outputs.enableDubbo3Lane }}
enableDubbo3Flow: ${{ steps.set-outputs.outputs.enableDubbo3Flow }}
download-midwares-and-cache:
name: download and cache middlewares
runs-on: ubuntu-latest
Expand Down Expand Up @@ -152,7 +154,7 @@ jobs:
uses: ./.github/actions/scenarios/dubbo/removal
test-for-dubbo3:
name: Test for dubbo3.x
if: needs.set-execution-conditions.outputs.enableDubbo3Router == 'true' || needs.set-execution-conditions.outputs.enableDubbo3Lane == 'true'
if: needs.set-execution-conditions.outputs.enableDubbo3Router == 'true' || needs.set-execution-conditions.outputs.enableDubbo3Lane == 'true' || needs.set-execution-conditions.outputs.enableDubbo3Flow == 'true'
runs-on: ubuntu-latest
needs: [set-execution-conditions, download-midwares-and-cache, build-agent-and-cache ]
strategy:
Expand Down Expand Up @@ -241,3 +243,6 @@ jobs:
- name: (dubbo3.x lane) test for ${{ matrix.dubbo-version }}-${{ matrix.dubbo-versions }}
if: needs.set-execution-conditions.outputs.enableDubbo3Lane == 'true'
uses: ./.github/actions/scenarios/dubbo/lane-dubbo3
- name: (dubbo3.x flow) test for ${{ matrix.dubbo-version }}-${{ matrix.dubbo-versions }}
if: needs.set-execution-conditions.outputs.enableDubbo3Flow == 'true'
uses: ./.github/actions/scenarios/dubbo/dubbo3-flow
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.springframework.web.bind.annotation.RestController;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.Resource;

Expand Down Expand Up @@ -55,7 +57,9 @@ public String rateLimitingWithHeader(@RequestParam(name = "key") String key, @Re
try {
RpcContext.getContext().setAttachment(key, value);
RpcContext.getContext().setAttachment(key2, value2);
return flowControlService.rateLimitingWithHeader(Collections.singletonMap(key, value));
Map<String, Object> attachments = new HashMap<>();
attachments.put(key, value);
return flowControlService.rateLimitingWithHeader(attachments);
} finally {
RpcContext.getContext().remove(key);
RpcContext.getContext().remove(key2);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright (C) 2025-2025 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.integration.flow.dubbo3;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;

import io.sermant.integration.utils.RequestUtils;

/**
* flow tests
*
* @author chengyouling
* @since 2025-02-19
*/
@EnabledIfEnvironmentVariable(named = "TEST_TYPE", matches = "dubbo3-flow")
public class Dubbo3FlowControlTest {
private static final String BASE_URL = "http://127.0.0.1:28020/consumer/flow/";
private static final int RATE_LIMITING_REQUEST_COUNT = 10;
private static final int BREAKER_REQUEST_COUNT = 10;
private static final String BREAKER_MSG = "is OPEN and does not permit further calls";
private static final String RATE_LIMITING_MSG = "Rate Limited";

/**
* 限流测试
*/
@Test
public void testRateLimiting() {
rateTest("rateLimiting");
rateTest("rateLimitingPrefix");
rateTest("rateLimitingSuffix");
rateTest("rateLimitingContains");
}

/**
* 限流测试-应用匹配测试
*/
@Test
public void testRateLimitingWithApplication() {
rateTest("rateLimitingWithApplication");
}

/**
* 限流测试-应用匹配测试
*/
@Test
public void testRateLimitingWithHeader() {
rateTest("rateLimitingWithHeader?key=key&value=attachment&key2=key2&value2=999");
rateTest("rateLimitingWithHeader?key=key&value=flowControlExact&key2=key2&value2=999");
rateTest("rateLimitingWithHeader?key=key&value=flowControlPrefix&key2=key2&value2=999");
rateTest("rateLimitingWithHeader?key=key&value=flowControlSuffix&key2=key2&value2=999");
rateTest("rateLimitingWithHeader?key=key&value=flowControlContains&key2=key2&value2=999");
rateTest("rateLimitingWithHeader?key=key&value=101&key2=key2&value2=999");
final AtomicBoolean check = new AtomicBoolean();
process("rateLimitingWithHeader?key=key&value=val&key2=key2&value2=998", RATE_LIMITING_MSG,
RATE_LIMITING_REQUEST_COUNT, check);
Assertions.assertFalse(check.get());
}

/**
* 限流测试-版本测试
*/
@Test
public void testRateLimitingWithVersion() {
rateTest("rateLimitingWithVersion");
}

/**
* 测试熔断-慢调用熔断
*/
@Test
public void cirEx() {
test("cirEx", BREAKER_MSG, BREAKER_REQUEST_COUNT);
}

/**
* 实例隔离-慢调用熔断
*/
@Test
public void instanceSlowInvoker() {
test("instanceSlowInvoker", BREAKER_MSG, BREAKER_REQUEST_COUNT);
}

/**
* 实例隔离-异常
*/
@Test
public void instanceEx() {
test("instanceEx", BREAKER_MSG, BREAKER_REQUEST_COUNT);
}

/**
* 错误注入-返回空
*/
@Test
public void faultNull() {
final String result = RequestUtils.get(BASE_URL + "faultNull", Collections.emptyMap(), String.class);
Assertions.assertNull(result);
}

/**
* 错误注入-抛异常
*/
@Test
public void faultThrowEx() {
test("faultThrowEx", "Request has been aborted by fault-ThrowException", 1);
}

/**
* 错误注入-返回空
*/
@Test
public void faultDelay() {
final long start = System.currentTimeMillis();
long delay = 2000L;
RequestUtils.get(BASE_URL + "faultDelay", Collections.emptyMap(), String.class);
Assertions.assertTrue((System.currentTimeMillis() - start) >= delay);
}

/**
* 隔离仓测试
*/
@Test
public void bulkHead() throws InterruptedException {
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(100));
int cycle = 5;
final CountDownLatch countDownLatch = new CountDownLatch(cycle);
final AtomicBoolean expected = new AtomicBoolean();
for (int i = 0; i < cycle; i++) {
threadPoolExecutor.execute(() -> {
try {
process("/bulkhead", "Bulkhead is full and does not permit further calls",
RATE_LIMITING_REQUEST_COUNT, expected);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
Assertions.assertTrue(expected.get());
threadPoolExecutor.shutdown();
}

/**
* 重试
*/
@Test
public void retry() {
final String result = RequestUtils.get(BASE_URL + "retry", Collections.emptyMap(), String.class);
Assertions.assertEquals(result, "3");
}

/**
* 测试熔断-异常
*/
@Test
public void cirSlowInvoker() {
test("cirSlowInvoker", BREAKER_MSG, BREAKER_REQUEST_COUNT);
}

private void rateTest(String api) {
test(api, RATE_LIMITING_MSG, RATE_LIMITING_REQUEST_COUNT);
}

private void test(String api, String msg, int requestCount) {
final AtomicBoolean check = new AtomicBoolean();
process(api, msg, requestCount, check);
Assertions.assertTrue(check.get());
}

private void process(String api, String flowControlMsg, int requestCount, AtomicBoolean check) {
String url = BASE_URL + api;
for (int i = 0; i < requestCount; i++) {
try {
RequestUtils.get(url, Collections.emptyMap(), String.class,
(clientHttpResponse, result) -> {
if (result.contains(flowControlMsg)) {
check.set(true);
}
return result;
});
} catch (Exception ex) {
if (ex.getMessage().contains(flowControlMsg)) {
check.set(true);
}
}
}
}
}
Loading

0 comments on commit d70612a

Please sign in to comment.