From e41d3c7fccd6db7ba5bb6998e2c3e838c2013d60 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 6 Jul 2022 11:40:23 +0800 Subject: [PATCH 1/4] store/copr: set low concurrency for keep order coprocessor requests --- store/copr/coprocessor.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 29cc437b182e6..418aeafb5055b 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -120,6 +120,10 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa } if it.req.KeepOrder { + // Don't set high concurrency for the keep order case. It wastes a lot of memory. + if it.concurrency > 2 { + it.concurrency = 2 + } it.sendRate = util.NewRateLimit(2 * it.concurrency) it.respChan = nil } else { From ad4c86a688bfebed650e7130d73e26403aa6e160 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 6 Jul 2022 12:09:20 +0800 Subject: [PATCH 2/4] update comment --- store/copr/coprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 418aeafb5055b..0f9da8d44a138 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -120,7 +120,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa } if it.req.KeepOrder { - // Don't set high concurrency for the keep order case. It wastes a lot of memory. + // Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing. if it.concurrency > 2 { it.concurrency = 2 } From d5ffeb1615977699a96a66a707572d271bac5b0b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 6 Jul 2022 17:47:20 +0800 Subject: [PATCH 3/4] address comment --- store/copr/coprocessor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 0f9da8d44a138..11bb04a83c25e 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -121,6 +121,11 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa if it.req.KeepOrder { // Don't set high concurrency for the keep order case. It wastes a lot of memory and gains nothing. + // TL;DR + // Because for a keep order coprocessor request, the cop tasks are handled one by one, if we set a + // higher concurrency, the data is just cached and not consumed for a while, this increase the memory usage. + // Set concurrency to 2 can reduce the memory usage and I've tested that it does not necessarily + // decrease the performance. if it.concurrency > 2 { it.concurrency = 2 } From 4f753fe930d0e01ebcc9f21af8e6c66eca1962b6 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Sun, 17 Jul 2022 09:08:56 +0800 Subject: [PATCH 4/4] fix CI --- Makefile | 4 ++-- store/copr/coprocessor.go | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 9200683f459f8..a10eaef06b208 100644 --- a/Makefile +++ b/Makefile @@ -436,8 +436,8 @@ bazel_test: failpoint-enable bazel_ci_prepare bazel_coverage_test: failpoint-enable bazel_ci_prepare bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover \ - -- //... -//cmd/... -//tests/... \ - -//br/pkg/task:task_test + -- //... -//cmd/... -//tests/graceshutdown/... \ + -//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test bazel_build: bazel_ci_prepare mkdir -p bin diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 161da48b39a1c..eb151254d2a26 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -127,7 +127,15 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa // Set concurrency to 2 can reduce the memory usage and I've tested that it does not necessarily // decrease the performance. if it.concurrency > 2 { + oldConcurrency := it.concurrency it.concurrency = 2 + + failpoint.Inject("testRateLimitActionMockConsumeAndAssert", func(val failpoint.Value) { + if val.(bool) { + // When the concurrency is too small, test case tests/realtikvtest/sessiontest.TestCoprocessorOOMAction can't trigger OOM condition + it.concurrency = oldConcurrency + } + }) } it.sendRate = util.NewRateLimit(2 * it.concurrency) it.respChan = nil