diff --git a/pkg/sql/colexec/hashjoiner.go b/pkg/sql/colexec/hashjoiner.go index d4d15b397ced..2457f1855d5d 100644 --- a/pkg/sql/colexec/hashjoiner.go +++ b/pkg/sql/colexec/hashjoiner.go @@ -26,19 +26,25 @@ import ( type hashJoinerState int const ( - // hjBuilding represents the state the hashJoiner when it is in the build - // phase. Output columns from the build table are stored and a hash map is - // constructed from its equality columns. + // hjBuilding represents the state the hashJoiner is in when it is in the + // build phase. Output columns from the build table are stored and a hash + // map is constructed from its equality columns. hjBuilding = iota - // hjProbing represents the state the hashJoiner is in when it is in the probe - // phase. Probing is done in batches against the stored hash map. + // hjProbing represents the state the hashJoiner is in when it is in the + // probe phase. Probing is done in batches against the stored hash map. hjProbing // hjEmittingUnmatched represents the state the hashJoiner is in when it is // emitting unmatched rows from its build table after having consumed the // probe table. This happens in the case of an outer join on the build side. hjEmittingUnmatched + + // hjDone represents the state the hashJoiner is in when it has finished + // emitting all output rows. Note that the build side will have been fully + // consumed in this state, but the probe side *might* have not been fully + // consumed. + hjDone ) // hashJoinerSpec is the specification for a hash join operator. The hash @@ -236,18 +242,38 @@ func (hj *hashJoiner) Next(ctx context.Context) coldata.Batch { switch hj.state { case hjBuilding: hj.build(ctx) + if hj.ht.vals.Length() == 0 { + // The build side is empty, so we can short-circuit probing + // phase altogether for INNER, RIGHT OUTER, and LEFT SEMI + // joins. + if hj.spec.joinType == sqlbase.JoinType_INNER || + hj.spec.joinType == sqlbase.JoinType_RIGHT_OUTER || + hj.spec.joinType == sqlbase.JoinType_LEFT_SEMI { + hj.state = hjDone + } + } continue case hjProbing: hj.exec(ctx) - if hj.output.Length() == 0 && hj.spec.right.outer { - hj.state = hjEmittingUnmatched + if hj.output.Length() == 0 { + if hj.spec.right.outer { + hj.state = hjEmittingUnmatched + } else { + hj.state = hjDone + } continue } return hj.output case hjEmittingUnmatched: + if hj.emittingUnmatchedState.rowIdx == hj.ht.vals.Length() { + hj.state = hjDone + continue + } hj.emitUnmatched() return hj.output + case hjDone: + return coldata.ZeroBatch default: colexecerror.InternalError("hash joiner in unhandled state") // This code is unreachable, but the compiler cannot infer that. diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index 0bdf6867d6d6..e769ac05763f 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -1168,8 +1168,7 @@ func TestHashJoinerProjection(t *testing.T) { hjOp, err := NewColOperator(ctx, flowCtx, args) require.NoError(t, err) hjOp.Op.Init() - for { - b := hjOp.Op.Next(ctx) + for b := hjOp.Op.Next(ctx); b.Length() > 0; b = hjOp.Op.Next(ctx) { // The output types should be {Int64, Int64, Bool, Decimal, Float64, Bytes} // and we check this explicitly. b.ColVec(0).Int64() @@ -1178,8 +1177,5 @@ func TestHashJoinerProjection(t *testing.T) { b.ColVec(3).Decimal() b.ColVec(4).Float64() b.ColVec(5).Bytes() - if b.Length() == 0 { - break - } } }