Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: enable linter for executor/aggregate.go #37015

Merged
merged 15 commits into from
Aug 23, 2022
3 changes: 3 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down Expand Up @@ -288,6 +289,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down Expand Up @@ -648,6 +650,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"executor/aggregate.go": "executor/aggregate.go",
"types/json/binary_functions.go": "types/json/binary_functions.go",
"types/json/binary_test.go": "types/json/binary_test.go",
"ddl/backfilling.go": "ddl/backfilling.go",
Expand Down
37 changes: 15 additions & 22 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ type HashAggIntermData struct {
}

// getPartialResultBatch fetches a batch of partial results from HashAggIntermData.
func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
func (d *HashAggIntermData) getPartialResultBatch(_ *stmtctx.StatementContext, prs [][]aggfuncs.PartialResult, _ []aggfuncs.AggFunc, maxChunkSize int) (_ [][]aggfuncs.PartialResult, groupKeys []string, reachEnd bool) {
keyStart := d.cursor
for ; d.cursor < len(d.groupKeys) && len(prs) < maxChunkSize; d.cursor++ {
prs = append(prs, d.partialResultMap[d.groupKeys[d.cursor]])
Expand Down Expand Up @@ -274,16 +274,9 @@ func (e *HashAggExec) Close() error {
close(e.finalOutputCh)
}
close(e.finishCh)
for _, ch := range e.partialOutputChs {
for range ch {
}
}
for _, ch := range e.partialInputChs {
for range ch {
}
}
for range e.finalOutputCh {
}
e.partialOutputChs = nil
e.partialInputChs = nil
e.finalOutputCh = nil
e.executed = false
if e.memTracker != nil {
e.memTracker.ReplaceBytesUsed(0)
Expand All @@ -295,7 +288,7 @@ func (e *HashAggExec) Close() error {
// Open implements the Executor Open interface.
func (e *HashAggExec) Open(ctx context.Context) error {
failpoint.Inject("mockHashAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("mock HashAggExec.baseExecutor.Open returned error"))
}
})
Expand Down Expand Up @@ -352,7 +345,7 @@ func closeBaseExecutor(b *baseExecutor) {
}
}

func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
func (e *HashAggExec) initForParallelExec(_ sessionctx.Context) {
sessionVars := e.ctx.GetSessionVars()
finalConcurrency := sessionVars.HashAggFinalConcurrency()
partialConcurrency := sessionVars.HashAggPartialConcurrency()
Expand Down Expand Up @@ -486,7 +479,7 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
}
if w.stats != nil {
w.stats.ExecTime += int64(time.Since(execStart))
w.stats.TaskNum += 1
w.stats.TaskNum++
}
// The intermData can be promised to be not empty if reaching here,
// so we set needShuffle to be true.
Expand All @@ -503,7 +496,7 @@ func getGroupKeyMemUsage(groupKey [][]byte) int64 {
return mem
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) {
func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, _ int) (err error) {
memSize := getGroupKeyMemUsage(w.groupKey)
w.groupKey, err = getGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
failpoint.Inject("ConsumeRandomPanic", nil)
Expand Down Expand Up @@ -532,7 +525,7 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s

// shuffleIntermData shuffles the intermediate data of partial workers to corresponded final workers.
// We only support parallel execution for single-machine, so process of encode and decode can be skipped.
func (w *HashAggPartialWorker) shuffleIntermData(sc *stmtctx.StatementContext, finalConcurrency int) {
func (w *HashAggPartialWorker) shuffleIntermData(_ *stmtctx.StatementContext, finalConcurrency int) {
groupKeysSlice := make([][]string, finalConcurrency)
for groupKey := range w.partialResultsMap {
finalWorkerIdx := int(murmur3.Sum32([]byte(groupKey))) % finalConcurrency
Expand Down Expand Up @@ -605,7 +598,7 @@ func getGroupKey(ctx sessionctx.Context, input *chunk.Chunk, groupKey [][]byte,
return groupKey, nil
}

func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
func (w *baseHashAggWorker) getPartialResult(_ *stmtctx.StatementContext, groupKey [][]byte, mapper aggPartialResultMapper) [][]aggfuncs.PartialResult {
n := len(groupKey)
partialResults := make([][]aggfuncs.PartialResult, n)
allMemDelta := int64(0)
Expand Down Expand Up @@ -706,7 +699,7 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err
}
if w.stats != nil {
w.stats.ExecTime += int64(time.Since(execStart))
w.stats.TaskNum += 1
w.stats.TaskNum++
}
}
}
Expand Down Expand Up @@ -906,7 +899,7 @@ func (e *HashAggExec) parallelExec(ctx context.Context, chk *chunk.Chunk) error
}

failpoint.Inject("parallelHashAggError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("HashAggExec.parallelExec error"))
}
})
Expand Down Expand Up @@ -1011,7 +1004,7 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {
}

failpoint.Inject("unparallelHashAggError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("HashAggExec.unparallelExec error"))
}
})
Expand Down Expand Up @@ -1263,7 +1256,7 @@ type StreamAggExec struct {
// Open implements the Executor Open interface.
func (e *StreamAggExec) Open(ctx context.Context) error {
failpoint.Inject("mockStreamAggExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
if val.(bool) {
if val, _ := val.(bool); val {
failpoint.Return(errors.New("mock StreamAggExec.baseExecutor.Open returned error"))
}
})
Expand Down Expand Up @@ -1955,4 +1948,4 @@ func (a *AggSpillDiskAction) GetPriority() int64 {
}

// SetLogHook sets the hook, it does nothing just to form the memory.ActionOnExceed interface.
func (a *AggSpillDiskAction) SetLogHook(hook func(uint64)) {}
func (a *AggSpillDiskAction) SetLogHook(_ func(uint64)) {}