Skip to content

Commit

Permalink
*: enable linter for executor/aggregate.go (pingcap#37015)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Aug 23, 2022
1 parent 45588a1 commit afc3276
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 21 deletions.
3 changes: 3 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down Expand Up @@ -289,6 +290,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down Expand Up @@ -649,6 +651,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/": "enable to ddl",
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ go_library(
"//util/admin",
"//util/bitmap",
"//util/breakpoint",
"//util/channel",
"//util/chunk",
"//util/codec",
"//util/collate",
Expand Down
40 changes: 19 additions & 21 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/channel"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/disk"
Expand Down Expand Up @@ -231,7 +232,7 @@ type HashAggIntermData struct {
}

// getPartialResultBatch fetches a batch of partial results from HashAggIntermData.
func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
func (d *HashAggIntermData) getPartialResultBatch(_ *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, _ []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
keyStart := d.cursor
for ; d.cursor < len(d.groupKeys) && len(prs) < maxChunkSize; d.cursor++ {
prs = append(prs, d.partialResultMap[d.groupKeys[d.cursor]])
Expand Down Expand Up @@ -275,15 +276,12 @@ func (e *HashAggExec) Close() error {
}
close(e.finishCh)
for _, ch := range e.partialOutputChs {
for range ch {
}
channel.Clear(ch)
}
for _, ch := range e.partialInputChs {
for range ch {
}
}
for range e.finalOutputCh {
channel.Clear(ch)
}
channel.Clear(e.finalOutputCh)
e.executed = false
if e.memTracker != nil {
e.memTracker.ReplaceBytesUsed(0)
Expand All @@ -295,7 +293,7 @@ func (e *HashAggExec) Close() error {
// Open implements the Executor Open interface.
func (e *HashAggExec) Open(ctx context.Context) error {
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
}
})
Expand Down Expand Up @@ -352,7 +350,7 @@ func closeBaseExecutor(b *baseExecutor) {
}
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
sessionVars := e.ctx.GetSessionVars()
finalConcurrency := sessionVars.HashAggFinalConcurrency()
partialConcurrency := sessionVars.HashAggPartialConcurrency()
Expand Down Expand Up @@ -486,7 +484,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
}
if w.stats != nil {
w.stats.ExecTime += int64(time.Since(execStart))
w.stats.TaskNum += 1
w.stats.TaskNum++
}
// The intermData can be promised to be not empty if reaching here,
// so we set needShuffle to be true.
Expand All @@ -503,7 +501,7 @@ func getGroupKeyMemUsage(groupKey [][]byte) int64 {
return mem
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) {
func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, _ int) (err error) {
memSize := getGroupKeyMemUsage(w.groupKey)
w.groupKey, err = getGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
failpoint.Inject("ConsumeRandomPanic", nil)
Expand Down Expand Up @@ -532,7 +530,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s

// shuffleIntermData shuffles the intermediate data of partial workers to corresponded final workers.
// We only support parallel execution for single-machine, so process of encode and decode can be skipped.
func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) {
func (w *HashAggPartialWorker) shuffleIntermData(_ *stmtctx.StatementContext, finalConcurrency int) {
groupKeysSlice := make([][]string, finalConcurrency)
for groupKey := range w.partialResultsMap {
finalWorkerIdx := int(murmur3.Sum32([]byte(groupKey))) % finalConcurrency
Expand Down Expand Up @@ -605,7 +603,7 @@ func getGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte,
return groupKey, nil
}

func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
n := len(groupKey)
partialResults := make([][]aggfuncs.PartialResult, n)
allMemDelta := int64(0)
Expand Down Expand Up @@ -706,7 +704,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err
}
if w.stats != nil {
w.stats.ExecTime += int64(time.Since(execStart))
w.stats.TaskNum += 1
w.stats.TaskNum++
}
}
}
Expand Down Expand Up @@ -906,7 +904,7 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
}

failpoint.Inject("parallelHashAggError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("HashAggExec.parallelExec error"))
}
})
Expand Down Expand Up @@ -1011,7 +1009,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
}

failpoint.Inject("unparallelHashAggError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("HashAggExec.unparallelExec error"))
}
})
Expand Down Expand Up @@ -1170,7 +1168,7 @@ func (w *AggWorkerStat) Clone() *AggWorkerStat {
}
}

func (e *HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) {
func (*HashAggRuntimeStats) workerString(buf *bytes.Buffer, prefix string, concurrency int, wallTime int64, workerStats []*AggWorkerStat) {
var totalTime, totalWait, totalExec, totalTaskNum int64
for _, w := range workerStats {
totalTime += w.WorkerTime
Expand Down Expand Up @@ -1231,7 +1229,7 @@ func (e *HashAggRuntimeStats) Merge(other execdetails.RuntimeStats) {
}

// Tp implements the RuntimeStats interface.
func (e *HashAggRuntimeStats) Tp() int {
func (*HashAggRuntimeStats) Tp() int {
return execdetails.TpHashAggRuntimeStat
}

Expand Down Expand Up @@ -1263,7 +1261,7 @@ type StreamAggExec struct {
// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
}
})
Expand Down Expand Up @@ -1950,9 +1948,9 @@ func (a *AggSpillDiskAction) Action(t *memory.Tracker) {
}

// GetPriority get the priority of the Action
func (a *AggSpillDiskAction) GetPriority() int64 {
func (*AggSpillDiskAction) GetPriority() int64 {
return memory.DefSpillPriority
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
func (a *AggSpillDiskAction) SetLogHook(hook func(uint64)) {}
func (*AggSpillDiskAction) SetLogHook(_ func(uint64)) {}
8 changes: 8 additions & 0 deletions util/channel/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "channel",
srcs = ["channel.go"],
importpath = "github.com/pingcap/tidb/util/channel",
visibility = ["//visibility:public"],
)
22 changes: 22 additions & 0 deletions util/channel/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package channel

// Clear is to clear the channel
func Clear[T any](ch chan T) {
//nolint:revive,all_revive
for range ch {
}
}

0 comments on commit afc3276

Please sign in to comment.