Skip to content

Commit

Permalink
executor: modify some hash join stats (#59668)
Browse files Browse the repository at this point in the history
close #59667
  • Loading branch information
xzhangxian1008 authored Feb 26, 2025
1 parent db15d43 commit 8010959
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 55 deletions.
1 change: 1 addition & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,7 @@ func (b *executorBuilder) buildHashJoinV2(v *plannercore.PhysicalHashJoin) exec.
HashJoinCtxV2: &join.HashJoinCtxV2{
OtherCondition: joinOtherCondition,
},
IsGA: plannercore.IsGAForHashJoinV2(v.JoinType, v.LeftJoinKeys, v.IsNullEQ, v.LeftNAJoinKeys),
}
e.HashJoinCtxV2.SessCtx = b.ctx
e.HashJoinCtxV2.JoinType = v.JoinType
Expand Down
135 changes: 87 additions & 48 deletions pkg/executor/join/hash_join_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,25 @@ type hashJoinRuntimeStatsV2 struct {

fetchAndBuildHashTable int64

partitionData int64
buildHashTable int64
probe int64
fetchAndProbe int64
partitionData int64
buildHashTable int64
probe int64
fetchAndProbe int64
workerFetchAndProbe int64

maxPartitionData int64
maxBuildHashTable int64
maxProbe int64
maxFetchAndProbe int64
maxPartitionData int64
maxBuildHashTable int64
maxProbe int64
maxWorkerFetchAndProbe int64

maxPartitionDataForCurrentRound int64
maxBuildHashTableForCurrentRound int64
maxProbeForCurrentRound int64
maxFetchAndProbeForCurrentRound int64
maxPartitionDataForCurrentRound int64
maxBuildHashTableForCurrentRound int64
maxProbeForCurrentRound int64
maxWorkerFetchAndProbeForCurrentRound int64

spill spillStats

isHashJoinGA bool
}

func setMaxValue(addr *int64, currentValue int64) {
Expand All @@ -182,25 +185,26 @@ func (e *hashJoinRuntimeStatsV2) reset() {
e.buildHashTable = 0
e.probe = 0
e.fetchAndProbe = 0
e.workerFetchAndProbe = 0
e.maxPartitionData = 0
e.maxBuildHashTable = 0
e.maxProbe = 0
e.maxFetchAndProbe = 0
e.maxWorkerFetchAndProbe = 0
e.maxPartitionDataForCurrentRound = 0
e.maxBuildHashTableForCurrentRound = 0
e.maxProbeForCurrentRound = 0
e.maxFetchAndProbeForCurrentRound = 0
e.maxWorkerFetchAndProbeForCurrentRound = 0
}

func (e *hashJoinRuntimeStatsV2) resetCurrentRound() {
e.maxPartitionData += e.maxPartitionDataForCurrentRound
e.maxBuildHashTable += e.maxBuildHashTableForCurrentRound
e.maxProbe += e.maxProbeForCurrentRound
e.maxFetchAndProbe += e.maxFetchAndProbeForCurrentRound
e.maxWorkerFetchAndProbe += e.maxWorkerFetchAndProbeForCurrentRound
e.maxPartitionDataForCurrentRound = 0
e.maxBuildHashTableForCurrentRound = 0
e.maxProbeForCurrentRound = 0
e.maxFetchAndProbeForCurrentRound = 0
e.maxWorkerFetchAndProbeForCurrentRound = 0
}

// Tp implements the RuntimeStats interface.
Expand All @@ -211,25 +215,60 @@ func (*hashJoinRuntimeStatsV2) Tp() int {
func (e *hashJoinRuntimeStatsV2) String() string {
buf := bytes.NewBuffer(make([]byte, 0, 128))
if e.fetchAndBuildHashTable > 0 {
buf.WriteString("build_hash_table:{total:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable)))
buf.WriteString(", fetch:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable - e.maxBuildHashTable - e.maxPartitionData)))
buf.WriteString(", build:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxBuildHashTable + e.maxPartitionData)))
buf.WriteString("}")
if e.isHashJoinGA {
buf.WriteString("build_hash_table:{concurrency:")
buf.WriteString(strconv.Itoa(e.concurrent))
buf.WriteString(", time:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable)))
buf.WriteString(", fetch:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable - e.maxBuildHashTable - e.maxPartitionData)))
buf.WriteString(", max_partition:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxPartitionData)))
buf.WriteString(", total_partition:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.partitionData)))
buf.WriteString(", max_build:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxBuildHashTable)))
buf.WriteString(", total_build:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.buildHashTable)))
buf.WriteString("}")
} else {
buf.WriteString("build_hash_table:{total:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable)))
buf.WriteString(", fetch:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndBuildHashTable - e.maxBuildHashTable - e.maxPartitionData)))
buf.WriteString(", build:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxBuildHashTable + e.maxPartitionData)))
buf.WriteString("}")
}
}

if e.probe > 0 {
buf.WriteString(", probe:{concurrency:")
buf.WriteString(strconv.Itoa(e.concurrent))
buf.WriteString(", total:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe)))
buf.WriteString(", max:")
buf.WriteString(execdetails.FormatDuration(time.Duration(atomic.LoadInt64(&e.maxFetchAndProbe))))
buf.WriteString(", probe:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxProbe)))
buf.WriteString(", fetch and wait:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe - e.maxProbe)))
if e.isHashJoinGA {
buf.WriteString(", time:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe)))
buf.WriteString(", fetch_and_wait:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe - e.maxProbe)))
buf.WriteString(", max_worker_time:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxWorkerFetchAndProbe)))
buf.WriteString(", total_worker_time:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.workerFetchAndProbe)))
buf.WriteString(", max_probe:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxProbe)))
buf.WriteString(", total_probe:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.probe)))
} else {
buf.WriteString(", total:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe)))
buf.WriteString(", max:")
buf.WriteString(execdetails.FormatDuration(time.Duration(atomic.LoadInt64(&e.maxWorkerFetchAndProbe))))
buf.WriteString(", probe:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.maxProbe)))
buf.WriteString(", fetch_and_wait:")
buf.WriteString(execdetails.FormatDuration(time.Duration(e.fetchAndProbe - e.maxProbe)))
}

if e.probeCollision > 0 {
buf.WriteString(", probe_collision:")
buf.WriteString(strconv.FormatInt(e.probeCollision, 10))
Expand All @@ -254,21 +293,21 @@ func (e *hashJoinRuntimeStatsV2) String() string {

func (e *hashJoinRuntimeStatsV2) Clone() execdetails.RuntimeStats {
return &hashJoinRuntimeStatsV2{
concurrent: e.concurrent,
probeCollision: e.probeCollision,
fetchAndBuildHashTable: e.fetchAndBuildHashTable,
partitionData: e.partitionData,
buildHashTable: e.buildHashTable,
probe: e.probe,
fetchAndProbe: e.fetchAndProbe,
maxPartitionData: e.maxPartitionData,
maxBuildHashTable: e.maxBuildHashTable,
maxProbe: e.maxProbe,
maxFetchAndProbe: e.maxFetchAndProbe,
maxPartitionDataForCurrentRound: e.maxPartitionDataForCurrentRound,
maxBuildHashTableForCurrentRound: e.maxBuildHashTableForCurrentRound,
maxProbeForCurrentRound: e.maxProbeForCurrentRound,
maxFetchAndProbeForCurrentRound: e.maxFetchAndProbeForCurrentRound,
concurrent: e.concurrent,
probeCollision: e.probeCollision,
fetchAndBuildHashTable: e.fetchAndBuildHashTable,
partitionData: e.partitionData,
buildHashTable: e.buildHashTable,
probe: e.probe,
fetchAndProbe: e.fetchAndProbe,
maxPartitionData: e.maxPartitionData,
maxBuildHashTable: e.maxBuildHashTable,
maxProbe: e.maxProbe,
maxWorkerFetchAndProbe: e.maxWorkerFetchAndProbe,
maxPartitionDataForCurrentRound: e.maxPartitionDataForCurrentRound,
maxBuildHashTableForCurrentRound: e.maxBuildHashTableForCurrentRound,
maxProbeForCurrentRound: e.maxProbeForCurrentRound,
maxWorkerFetchAndProbeForCurrentRound: e.maxWorkerFetchAndProbeForCurrentRound,
}
}

Expand All @@ -289,7 +328,7 @@ func (e *hashJoinRuntimeStatsV2) Merge(rs execdetails.RuntimeStats) {
e.probeCollision += tmp.probeCollision
e.fetchAndProbe += tmp.fetchAndProbe
e.probe += tmp.probe
if e.maxFetchAndProbe < tmp.maxFetchAndProbe {
e.maxFetchAndProbe = tmp.maxFetchAndProbe
if e.maxWorkerFetchAndProbe < tmp.maxWorkerFetchAndProbe {
e.maxWorkerFetchAndProbe = tmp.maxWorkerFetchAndProbe
}
}
6 changes: 5 additions & 1 deletion pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,9 @@ type ProbeWorkerV2 struct {
func (w *ProbeWorkerV2) updateProbeStatistic(start time.Time, probeTime int64) {
t := time.Since(start)
atomic.AddInt64(&w.HashJoinCtx.stats.probe, probeTime)
atomic.AddInt64(&w.HashJoinCtx.stats.workerFetchAndProbe, int64(t))
setMaxValue(&w.HashJoinCtx.stats.maxProbeForCurrentRound, probeTime)
setMaxValue(&w.HashJoinCtx.stats.maxFetchAndProbeForCurrentRound, int64(t))
setMaxValue(&w.HashJoinCtx.stats.maxWorkerFetchAndProbeForCurrentRound, int64(t))
}

func (w *ProbeWorkerV2) restoreAndProbe(inDisk *chunk.DataInDiskByChunks, start time.Time) {
Expand Down Expand Up @@ -634,6 +635,8 @@ type HashJoinV2Exec struct {
prepared bool
inRestore bool

IsGA bool

isMemoryClearedForTest bool
}

Expand Down Expand Up @@ -748,6 +751,7 @@ func (e *HashJoinV2Exec) Open(ctx context.Context) error {
if e.stats != nil {
e.stats.reset()
e.stats.spill.partitionNum = int(e.partitionNumber)
e.stats.isHashJoinGA = e.IsGA
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/test/jointest/hashjoin/hash_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,12 @@ func TestExplainAnalyzeJoin(t *testing.T) {
rows = tk.MustQuery("explain analyze select /*+ INL_HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows()
require.Equal(t, 8, len(rows))
require.Regexp(t, "IndexHashJoin.*", rows[0][0])
require.Regexp(t, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}", rows[0][5])
require.Regexp(t, "time:.*, open:.*, close:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}", rows[0][5])
// Test for hash join.
rows = tk.MustQuery("explain analyze select /*+ HASH_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows()
require.Equal(t, 7, len(rows))
require.Regexp(t, "HashJoin.*", rows[0][0])
require.Regexp(t, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch and wait:.*}", rows[0][5])
require.Regexp(t, "time:.*, open:.*, close:.*, loops:.*, build_hash_table:{concurrency:.*, time:.*, fetch:.*, max_partition:.*, total_partition:.*, max_build:.*, total_build:.*}, probe:{concurrency:.*, time:.*, fetch_and_wait:.*, max_worker_time:.*, total_worker_time:.*, max_probe:.*, total_probe:.*}", rows[0][5])

// TestExplainAnalyzeIndexHashJoin
// Issue 43597
Expand All @@ -397,7 +397,7 @@ func TestExplainAnalyzeJoin(t *testing.T) {
require.Equal(t, 7, len(rows))
require.Regexp(t, "IndexHashJoin.*", rows[1][0])
// When innerWorkerRuntimeStats.join is negative, `join:` will not print.
require.Regexp(t, "time:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}", rows[1][5])
require.Regexp(t, "time:.*, open:.*, close:.*, loops:.*, inner:{total:.*, concurrency:.*, task:.*, construct:.*, fetch:.*, build:.*, join:.*}", rows[1][5])
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ func shouldSkipHashJoin(p *logicalop.LogicalJoin) bool {
return (p.PreferJoinType&h.PreferNoHashJoin) > 0 || (p.SCtx().GetSessionVars().DisableHashJoin)
}

func isGAForHashJoinV2(joinType logicalop.JoinType, leftJoinKeys []*expression.Column, isNullEQ []bool, leftNAJoinKeys []*expression.Column) bool {
// IsGAForHashJoinV2 judges if this hash join is GA
func IsGAForHashJoinV2(joinType logicalop.JoinType, leftJoinKeys []*expression.Column, isNullEQ []bool, leftNAJoinKeys []*expression.Column) bool {
// nullaware join
if len(leftNAJoinKeys) > 0 {
return false
Expand All @@ -397,7 +398,7 @@ func isGAForHashJoinV2(joinType logicalop.JoinType, leftJoinKeys []*expression.C

// CanUseHashJoinV2 returns true if current join is supported by hash join v2
func canUseHashJoinV2(joinType logicalop.JoinType, leftJoinKeys []*expression.Column, isNullEQ []bool, leftNAJoinKeys []*expression.Column) bool {
if !isGAForHashJoinV2(joinType, leftJoinKeys, isNullEQ, leftNAJoinKeys) && !joinversion.UseHashJoinV2ForNonGAJoin {
if !IsGAForHashJoinV2(joinType, leftJoinKeys, isNullEQ, leftNAJoinKeys) && !joinversion.UseHashJoinV2ForNonGAJoin {
return false
}
switch joinType {
Expand Down
2 changes: 1 addition & 1 deletion tests/integrationtest/r/planner/core/cbo.result
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ id estRows actRows task access object execution info operator info memory disk
Projection_9 1.00 1 root NULL time:<num>, open:<num>, close:<num>, loops:<num>, RU:<num>, Concurrency:OFF planner__core__cbo.t1.a, planner__core__cbo.t1.b, Column#8 <num> N/A
└─StreamAgg_11 1.00 1 root NULL time:<num>, open:<num>, close:<num>, loops:<num> funcs:sum(Column#16)->Column#8, funcs:firstrow(Column#17)->planner__core__cbo.t1.a, funcs:firstrow(Column#18)->planner__core__cbo.t1.b <num> N/A
└─Projection_53 4.00 3 root NULL time:<num>, open:<num>, close:<num>, loops:<num>, Concurrency:OFF cast(planner__core__cbo.t1.c, decimal(10,0) BINARY)->Column#16, planner__core__cbo.t1.a->Column#17, planner__core__cbo.t1.b->Column#18 <num> N/A
└─HashJoin_51 4.00 3 root NULL time:<num>, open:<num>, close:<num>, loops:<num>, build_hash_table:{total:<num>, fetch:<num>, build:<num>}, probe:{concurrency:<num>, total:<num>, max:<num>, probe:<num>, fetch and wait:<num>} inner join, equal:[eq(planner__core__cbo.t1.a, planner__core__cbo.t2.b)] <num> <num>
└─HashJoin_51 4.00 3 root NULL time:<num>, open:<num>, close:<num>, loops:<num>, build_hash_table:{concurrency:<num>, time:<num>, fetch:<num>, max_partition:<num>, total_partition:<num>, max_build:<num>, total_build:<num>}, probe:{concurrency:<num>, time:<num>, fetch_and_wait:<num>, max_worker_time:<num>, total_worker_time:<num>, max_probe:<num>, total_probe:<num>} inner join, equal:[eq(planner__core__cbo.t1.a, planner__core__cbo.t2.b)] <num> <num>
├─TableReader_30(Build) 6.00 6 root NULL time.*loops.*cop_task.* data:Selection_29 <num> N/A
│ └─Selection_29 6.00 6 cop[tikv] NULL tikv_task:{time:<num>, loops:<num>} gt(planner__core__cbo.t2.b, 1), not(isnull(planner__core__cbo.t2.b)) N/A N/A
│ └─TableFullScan_28 6.00 6 cop[tikv] table:t2 tikv_task:{time:<num>, loops:<num>} keep order:false N/A N/A
Expand Down

0 comments on commit 8010959

Please sign in to comment.