Skip to content

Commit

Permalink
workerpool: fix block on Tune when all workers finished (pingcap#59271)
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Feb 6, 2025
1 parent 6673923 commit 444c38f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 2 deletions.
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPi
case <-ctx.Done():
return
case <-ticker.C:
failpoint.InjectCall("onUpdateJobParam")
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeed()
if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() {
bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed)
Expand Down Expand Up @@ -876,7 +877,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job
}

err = pipe.Close()

failpoint.InjectCall("afterPipeLineClose")
cancel()
wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit
if opErr := ctx.OperatorErr(); opErr != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ func (w *worker) runOneJobStep(
case <-stopCheckingJobCancelled:
return
case <-ticker.C:
failpoint.InjectCall("checkJobCancelled", job)
latestJob, err := sysTblMgr.GetJobByID(w.workCtx, job.ID)
if err == systable.ErrNotFound {
logutil.DDLLogger().Info(
Expand Down
2 changes: 2 additions & 0 deletions pkg/resourcemanager/pool/workerpool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ go_library(
deps = [
"//pkg/resourcemanager/util",
"//pkg/util",
"//pkg/util/logutil",
"//pkg/util/syncutil",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)

Expand Down
11 changes: 10 additions & 1 deletion pkg/resourcemanager/pool/workerpool/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (

"github.com/pingcap/tidb/pkg/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/syncutil"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)

// TaskMayPanic is a type to remind the developer that need to handle panic in
Expand Down Expand Up @@ -196,8 +198,15 @@ func (p *WorkerPool[T, R]) Tune(numWorkers int32) {
}
} else if diff < 0 {
// Remove workers
outer:
for i := 0; i < int(-diff); i++ {
p.quitChan <- struct{}{}
select {
case p.quitChan <- struct{}{}:
case <-p.ctx.Done():
logutil.BgLogger().Info("context done when tuning worker pool",
zap.Int32("from", p.numWorkers), zap.Int32("to", numWorkers))
break outer
}
}
}
p.numWorkers = numWorkers
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_test(
name = "addindextest2_test",
timeout = "long",
srcs = [
"alter_job_test.go",
"global_sort_test.go",
"main_test.go",
],
Expand Down
63 changes: 63 additions & 0 deletions tests/realtikvtest/addindextest2/alter_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package addindextest

import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/tests/realtikvtest"
)

func TestAlterThreadRightAfterJobFinish(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_dist_task=0;")
tk.MustExec("create table t (c1 int primary key, c2 int)")
tk.MustExec("insert t values (1, 1), (2, 2), (3, 3);")
var updated bool
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/checkJobCancelled", func(job *model.Job) {
if !updated && job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization {
updated = true
fmt.Println("TEST-LOG: set thread=1")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 1", job.ID))
}
})
var pipeClosed atomic.Bool
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterPipeLineClose", func() {
fmt.Println("TEST-LOG: start sleep")
pipeClosed.Store(true)
time.Sleep(5 * time.Second)
fmt.Println("TEST-LOG: end sleep")
})
var onUpdateJobParam bool
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onUpdateJobParam", func() {
if !onUpdateJobParam {
onUpdateJobParam = true
for !pipeClosed.Load() {
time.Sleep(100 * time.Millisecond)
}
fmt.Println("TEST-LOG: proceed update param")
}
})
tk.MustExec("alter table t add index idx(c2)")
}

0 comments on commit 444c38f

Please sign in to comment.