From afc3276d9fa8bb009fce31fb021eb1907cc2c8e1 Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Tue, 23 Aug 2022 22:20:21 +0800 Subject: [PATCH] *: enable linter for executor/aggregate.go (#37015) --- build/nogo_config.json | 3 +++ executor/BUILD.bazel | 1 + executor/aggregate.go | 40 +++++++++++++++++++--------------------- util/channel/BUILD.bazel | 8 ++++++++ util/channel/channel.go | 22 ++++++++++++++++++++++ 5 files changed, 53 insertions(+), 21 deletions(-) create mode 100644 util/channel/BUILD.bazel create mode 100644 util/channel/channel.go diff --git a/build/nogo_config.json b/build/nogo_config.json index 3cd9d4941f00e..cbed29bc1bc68 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -154,6 +154,7 @@ ".*_generated\\.go$": "ignore generated code" }, "only_files": { + "executor/aggregate.go": "executor/aggregate.go", "types/json/binary_functions.go": "types/json/binary_functions.go", "types/json/binary_test.go": "types/json/binary_test.go", "ddl/backfilling.go": "ddl/backfilling.go", @@ -289,6 +290,7 @@ ".*_generated\\.go$": "ignore generated code" }, "only_files": { + "executor/aggregate.go": "executor/aggregate.go", "types/json/binary_functions.go": "types/json/binary_functions.go", "types/json/binary_test.go": "types/json/binary_test.go", "ddl/backfilling.go": "ddl/backfilling.go", @@ -649,6 +651,7 @@ ".*_generated\\.go$": "ignore generated code" }, "only_files": { + "executor/aggregate.go": "executor/aggregate.go", "types/json/binary_functions.go": "types/json/binary_functions.go", "types/json/binary_test.go": "types/json/binary_test.go", "ddl/": "enable to ddl", diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 3317d670ab387..0e7ab7a56ca44 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -154,6 +154,7 @@ go_library( "//util/admin", "//util/bitmap", "//util/breakpoint", + "//util/channel", "//util/chunk", "//util/codec", "//util/collate", diff --git a/executor/aggregate.go b/executor/aggregate.go index d33f5c8fcee5e..1fa60eb07630e 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" + "github.com/pingcap/tidb/util/channel" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/disk" @@ -231,7 +232,7 @@ type HashAggIntermData struct { } // getPartialResultBatch fetches a batch of partial results from HashAggIntermData. -func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) { +func (d *HashAggIntermData) getPartialResultBatch(_ *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, _ []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) { keyStart := d.cursor for ; d.cursor < len(d.groupKeys) && len(prs) < maxChunkSize; d.cursor++ { prs = append(prs, d.partialResultMap[d.groupKeys[d.cursor]]) @@ -275,15 +276,12 @@ func (e *HashAggExec) Close() error { } close(e.finishCh) for _, ch := range e.partialOutputChs { - for range ch { - } + channel.Clear(ch) } for _, ch := range e.partialInputChs { - for range ch { - } - } - for range e.finalOutputCh { + channel.Clear(ch) } + channel.Clear(e.finalOutputCh) e.executed = false if e.memTracker != nil { e.memTracker.ReplaceBytesUsed(0) @@ -295,7 +293,7 @@ func (e *HashAggExec) Close() error { // Open implements the Executor Open interface. func (e *HashAggExec) Open(ctx context.Context) error { failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { - if val.(bool) { + if val, _ := val.(bool); val { failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error")) } }) @@ -352,7 +350,7 @@ func closeBaseExecutor(b *baseExecutor) { } } -func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) { +func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) { sessionVars := e.ctx.GetSessionVars() finalConcurrency := sessionVars.HashAggFinalConcurrency() partialConcurrency := sessionVars.HashAggPartialConcurrency() @@ -486,7 +484,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG } if w.stats != nil { w.stats.ExecTime += int64(time.Since(execStart)) - w.stats.TaskNum += 1 + w.stats.TaskNum++ } // The intermData can be promised to be not empty if reaching here, // so we set needShuffle to be true. @@ -503,7 +501,7 @@ func getGroupKeyMemUsage(groupKey [][]byte) int64 { return mem } -func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) { +func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, _ int) (err error) { memSize := getGroupKeyMemUsage(w.groupKey) w.groupKey, err = getGroupKey(w.ctx, chk, w.groupKey, w.groupByItems) failpoint.Inject("ConsumeRandomPanic", nil) @@ -532,7 +530,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s // shuffleIntermData shuffles the intermediate data of partial workers to corresponded final workers. // We only support parallel execution for single-machine, so process of encode and decode can be skipped. -func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) { +func (w *HashAggPartialWorker) shuffleIntermData(_ *stmtctx.StatementContext, finalConcurrency int) { groupKeysSlice := make([][]string, finalConcurrency) for groupKey := range w.partialResultsMap { finalWorkerIdx := int(murmur3.Sum32([]byte(groupKey))) % finalConcurrency @@ -605,7 +603,7 @@ func getGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte, return groupKey, nil } -func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult { +func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult { n := len(groupKey) partialResults := make([][]aggfuncs.PartialResult, n) allMemDelta := int64(0) @@ -706,7 +704,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err } if w.stats != nil { w.stats.ExecTime += int64(time.Since(execStart)) - w.stats.TaskNum += 1 + w.stats.TaskNum++ } } } @@ -906,7 +904,7 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error } failpoint.Inject("parallelHashAggError", func(val failpoint.Value) { - if val.(bool) { + if val, _ := val.(bool); val { failpoint.Return(errors.New("HashAggExec.parallelExec error")) } }) @@ -1011,7 +1009,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) { } failpoint.Inject("unparallelHashAggError", func(val failpoint.Value) { - if val.(bool) { + if val, _ := val.(bool); val { failpoint.Return(errors.New("HashAggExec.unparallelExec error")) } }) @@ -1170,7 +1168,7 @@ func (w *AggWorkerStat) Clone() *AggWorkerStat { } } -func (e *HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) { +func (*HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) { var totalTime, totalWait, totalExec, totalTaskNum int64 for _, w := range workerStats { totalTime += w.WorkerTime @@ -1231,7 +1229,7 @@ func (e *HashAggRuntimeStats) Merge(other execdetails.RuntimeStats) { } // Tp implements the RuntimeStats interface. -func (e *HashAggRuntimeStats) Tp() int { +func (*HashAggRuntimeStats) Tp() int { return execdetails.TpHashAggRuntimeStat } @@ -1263,7 +1261,7 @@ type StreamAggExec struct { // Open implements the Executor Open interface. func (e *StreamAggExec) Open(ctx context.Context) error { failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) { - if val.(bool) { + if val, _ := val.(bool); val { failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error")) } }) @@ -1950,9 +1948,9 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) { } // GetPriority get the priority of the Action -func (a *AggSpillDiskAction) GetPriority() int64 { +func (*AggSpillDiskAction) GetPriority() int64 { return memory.DefSpillPriority } // SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface. -func (a *AggSpillDiskAction) SetLogHook(hook func(uint64)) {} +func (*AggSpillDiskAction) SetLogHook(_ func(uint64)) {} diff --git a/util/channel/BUILD.bazel b/util/channel/BUILD.bazel new file mode 100644 index 0000000000000..e04118364ed1d --- /dev/null +++ b/util/channel/BUILD.bazel @@ -0,0 +1,8 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "channel", + srcs = ["channel.go"], + importpath = "github.com/pingcap/tidb/util/channel", + visibility = ["//visibility:public"], +) diff --git a/util/channel/channel.go b/util/channel/channel.go new file mode 100644 index 0000000000000..e9956b2f97050 --- /dev/null +++ b/util/channel/channel.go @@ -0,0 +1,22 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package channel + +// Clear is to clear the channel +func Clear[T any](ch chan T) { + //nolint:revive,all_revive + for range ch { + } +}