From 302ab0bffa31fa10cc668b5654d1435013778b6c Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 14 Nov 2024 14:42:07 -0800 Subject: [PATCH 01/15] [kvexec] merge join --- go/libraries/doltcore/sqle/kvexec/builder.go | 12 +- .../doltcore/sqle/kvexec/lookup_join.go | 12 - .../doltcore/sqle/kvexec/merge_join.go | 214 ++++++++++++++++++ 3 files changed, 225 insertions(+), 13 deletions(-) create mode 100644 go/libraries/doltcore/sqle/kvexec/merge_join.go diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index f9e91dd1690..18c77ee7a38 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -52,12 +52,22 @@ func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, er split := len(srcTags) projections := append(srcTags, dstTags...) rowJoiner := newRowJoiner([]schema.Schema{srcSchema, dstIter.Schema()}, []int{split}, projections, dstIter.NodeStore()) - return rowIterTableLookupJoin(srcIter, dstIter, keyLookupMapper, rowJoiner, srcFilter, dstFilter, n.Filter, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()) + return newLookupKvIter(srcIter, dstIter, keyLookupMapper, rowJoiner, srcFilter, dstFilter, n.Filter, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()) } } } } } + if n.Op.IsMerge() && !n.Op.IsPartial() { + if leftMap, leftIter, _, leftSchema, leftTags, leftFilter, err := getSourceKv(ctx, n.Left(), true); err == nil && leftSchema != nil { + if _, rightIter, _, rightSchema, rightTags, rightFilter, err := getSourceKv(ctx, n.Left(), true); err == nil && rightSchema != nil { + split := len(leftTags) + projections := append(leftTags, rightTags...) + rowJoiner := newRowJoiner([]schema.Schema{leftSchema, rightSchema}, []int{split}, projections, leftMap.NodeStore()) + return newMergeKvIter(leftIter, rightIter, leftMap, rowJoiner, leftFilter, rightFilter, n.Filter, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()) + } + } + } case *plan.GroupBy: if len(n.GroupByExprs) == 0 && len(n.SelectedExprs) == 1 { if cnt, ok := n.SelectedExprs[0].(*aggregation.Count); ok { diff --git a/go/libraries/doltcore/sqle/kvexec/lookup_join.go b/go/libraries/doltcore/sqle/kvexec/lookup_join.go index acf5b85d7c4..65bfebc927a 100644 --- a/go/libraries/doltcore/sqle/kvexec/lookup_join.go +++ b/go/libraries/doltcore/sqle/kvexec/lookup_join.go @@ -31,18 +31,6 @@ import ( "github.com/dolthub/dolt/go/store/val" ) -func rowIterTableLookupJoin( - srcIter prolly.MapIter, - dstIter index.SecondaryLookupIterGen, - mapping *lookupMapping, - rowJoiner *prollyToSqlJoiner, - srcFilter, dstFilter, joinFilter sql.Expression, - isLeftJoin bool, - excludeNulls bool, -) (sql.RowIter, error) { - return newLookupKvIter(srcIter, dstIter, mapping, rowJoiner, srcFilter, dstFilter, joinFilter, isLeftJoin, excludeNulls) -} - type lookupJoinKvIter struct { srcIter prolly.MapIter srcKey val.Tuple diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go new file mode 100644 index 00000000000..ceae1ea99d4 --- /dev/null +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -0,0 +1,214 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kvexec + +import ( + "github.com/dolthub/dolt/go/store/prolly" + "github.com/dolthub/dolt/go/store/val" + "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/expression" + "io" +) + +func newMergeKvIter( + leftIter, rightIter prolly.MapIter, + leftMap prolly.Map, + joiner *prollyToSqlJoiner, + leftFilter, rightFilter, joinFilter sql.Expression, + isLeftJoin bool, + excludeNulls bool, +) (*mergeJoinKvIter, error) { + filters := expression.SplitConjunction(joinFilter) + cmp, ok := filters[0].(expression.Comparer) + if !ok { + if equality, ok := filters[0].(expression.Equality); ok { + var err error + cmp, err = equality.ToComparer() + if err != nil { + return nil, nil + } + } else { + return nil, nil + } + } + + if len(filters) == 0 { + return nil, sql.ErrNoJoinFilters.New() + } + + var lIdx, rIdx int + if l, ok := cmp.Left().(*expression.GetField); ok { + if r, ok := cmp.Right().(*expression.GetField); ok { + // get indices of get fields + lIdx = l.Index() + rIdx = r.Index() + } + } + + if lIdx == rIdx { + return nil, nil + } + + return &mergeJoinKvIter{ + leftIter: leftIter, + rightIter: rightIter, + joiner: joiner, + cmpDesc: leftMap.KeyDesc().PrefixDesc(1), + leftFilter: leftFilter, + rightFilter: rightFilter, + joinFilters: filters[1:], + isLeftJoin: isLeftJoin, + excludeNulls: excludeNulls, + }, nil +} + +type mergeJoinKvIter struct { + leftIter prolly.MapIter + leftKey val.Tuple + leftVal val.Tuple + + rightIter prolly.MapIter + rightKey val.Tuple + rightVal val.Tuple + + // todo convert comparer to be []byte-amenable callback + cmpDesc val.TupleDesc + + nextKey val.Tuple + nextVal val.Tuple + + lookaheadBuf [][]byte + matchPos int + + // projections + joiner *prollyToSqlJoiner + + // todo: we want to build KV-side static expression implementations + // so that we can execute filters more efficiently + leftFilter sql.Expression + rightFilter sql.Expression + joinFilters []sql.Expression + + // LEFT_JOIN impl details + excludeNulls bool + isLeftJoin bool + returnedARow bool +} + +var _ sql.RowIter = (*mergeJoinKvIter)(nil) + +func (l *mergeJoinKvIter) Close(_ *sql.Context) error { + return nil +} + +func (l *mergeJoinKvIter) Next(ctx *sql.Context) (sql.Row, error) { + var err error + if l.leftKey == nil { + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return nil, err + } + l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) + if err != nil { + return nil, err + } + } + + if len(l.lookaheadBuf) > 0 { + goto match + } + +incr: + // increment state + switch l.cmpDesc.Compare(l.leftKey, l.rightKey) { + case -1: + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return nil, err + } + case 0: + goto matchBuf + case +1: + if l.nextKey != nil { + l.rightKey, l.rightVal = l.nextKey, l.nextVal + l.nextKey = nil + } else { + l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) + if err != nil { + return nil, err + } + } + } + if l.leftKey == nil || l.rightKey == nil { + return nil, io.EOF + } + goto incr + +matchBuf: + // fill lookahead buffer with keys from right side + l.nextKey, l.nextVal, err = l.rightIter.Next(ctx) + if err != nil { + // TODO: EOF here? + return nil, err + } + if l.cmpDesc.Compare(l.leftKey, l.nextKey) == 0 { + l.lookaheadBuf = append(l.lookaheadBuf, l.nextKey, l.nextVal) + goto matchBuf + } + +match: + // match state + // lookaheadBuf and rightKey + var candidate sql.Row + var rightKeyNil bool + if l.matchPos < len(l.lookaheadBuf) { + candidate, err = l.joiner.buildRow(ctx, l.leftKey, l.leftVal, l.lookaheadBuf[l.matchPos], l.lookaheadBuf[l.matchPos+1]) + rightKeyNil = l.lookaheadBuf[l.matchPos] == nil + l.matchPos += 2 + } else if l.matchPos == len(l.lookaheadBuf) { + candidate, err = l.joiner.buildRow(ctx, l.leftKey, l.leftVal, l.rightKey, l.rightVal) + rightKeyNil = l.rightKey == nil + l.matchPos++ + } else { + // reset + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return nil, err + } + if l.cmpDesc.Compare(l.leftKey, l.rightKey) != 0 { + l.lookaheadBuf = l.lookaheadBuf[:0] + l.matchPos = 0 + goto incr + } + } + if err != nil { + return nil, err + } + + // check filters + for _, f := range l.joinFilters { + res, err := sql.EvaluateCondition(ctx, f, candidate) + if err != nil { + return nil, err + } + if res == nil && l.excludeNulls { + // override default left join behavior + goto match + } else if !sql.IsTrue(res) && !rightKeyNil { + goto match + } + } + return candidate, nil +} From 4fa6227c36f54b0f103f679684262de91cf5f67f Mon Sep 17 00:00:00 2001 From: max-hoffman Date: Thu, 14 Nov 2024 22:52:27 +0000 Subject: [PATCH 02/15] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/kvexec/merge_join.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index ceae1ea99d4..6757330f495 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -15,11 +15,13 @@ package kvexec import ( - "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/val" + "io" + "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" - "io" + + "github.com/dolthub/dolt/go/store/prolly" + "github.com/dolthub/dolt/go/store/val" ) func newMergeKvIter( From 5993750b5e0869888ed19e1209af2d5c8008f78d Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 15 Nov 2024 09:19:49 -0800 Subject: [PATCH 03/15] fix null panic --- go/libraries/doltcore/sqle/kvexec/merge_join.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index ceae1ea99d4..270e0c924b8 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -15,6 +15,7 @@ package kvexec import ( + "fmt" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/val" "github.com/dolthub/go-mysql-server/sql" @@ -58,7 +59,7 @@ func newMergeKvIter( } if lIdx == rIdx { - return nil, nil + return nil, fmt.Errorf("unsupported merge comparison") } return &mergeJoinKvIter{ From 0a7e70a08c52561584a0d35d34773cdd7966b07b Mon Sep 17 00:00:00 2001 From: max-hoffman Date: Tue, 19 Nov 2024 03:44:32 +0000 Subject: [PATCH 04/15] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/kvexec/builder.go | 1 + go/libraries/doltcore/sqle/kvexec/merge_join.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index 0864ce9884d..9a9fc0f58b9 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -17,6 +17,7 @@ package kvexec import ( "context" "fmt" + "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" "github.com/dolthub/go-mysql-server/sql/expression/function/aggregation" diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index 53cff18c02e..feaf9b5d6d5 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -16,12 +16,12 @@ package kvexec import ( "errors" - "github.com/dolthub/dolt/go/libraries/doltcore/schema" "io" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/val" ) From 5fbca6182517ed851e4100d7f6eb9b358ffea775 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Tue, 19 Nov 2024 11:00:28 -0800 Subject: [PATCH 05/15] left joins --- .../doltcore/sqle/kvexec/merge_join.go | 140 +++++++++++++----- 1 file changed, 103 insertions(+), 37 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index 53cff18c02e..dc1fe5ca62c 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -70,13 +70,6 @@ func newMergeKvIter( // return nil, fmt.Errorf("unsupported merge comparison") //} - if leftFilter != nil { - joinFilters = append(joinFilters, leftFilter) - } - if rightFilter != nil { - joinFilters = append(joinFilters, rightFilter) - } - return &mergeJoinKvIter{ leftIter: leftIter, rightIter: rightIter, @@ -85,6 +78,8 @@ func newMergeKvIter( leftNorm: leftNorm, rightNorm: rightNorm, joinFilters: joinFilters, + leftFilter: leftFilter, + rightFilter: rightFilter, isLeftJoin: isLeftJoin, excludeNulls: excludeNulls, }, nil @@ -117,11 +112,13 @@ type mergeJoinKvIter struct { // todo: we want to build KV-side static expression implementations // so that we can execute filters more efficiently joinFilters []sql.Expression + leftFilter sql.Expression + rightFilter sql.Expression // LEFT_JOIN impl details excludeNulls bool isLeftJoin bool - returnedARow bool + matchedLeft bool } var _ sql.RowIter = (*mergeJoinKvIter)(nil) @@ -151,10 +148,23 @@ incr: // increment state switch l.cmp(l.leftKey, l.leftVal, l.rightKey, l.rightVal) { case -1: + var oldLeftKey, oldLeftVal val.Tuple + { + // left join + if !l.matchedLeft && l.isLeftJoin { + oldLeftKey, oldLeftVal = l.leftKey, l.leftVal + } + l.matchedLeft = false + } + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) if err != nil { return nil, err } + + if oldLeftKey != nil { + return l.buildCandidate(ctx, oldLeftKey, oldLeftVal, nil, nil) + } case 0: goto matchBuf case +1: @@ -191,75 +201,131 @@ matchBuf: match: // match state // lookaheadBuf and rightKey - var candidate sql.Row - var rightKeyNil bool if l.matchPos < len(l.lookaheadBuf) { - candidate, err = l.buildCandidate(ctx, l.leftKey, l.leftVal, l.lookaheadBuf[l.matchPos], l.lookaheadBuf[l.matchPos+1]) - rightKeyNil = l.lookaheadBuf[l.matchPos] == nil + candidate, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, l.lookaheadBuf[l.matchPos], l.lookaheadBuf[l.matchPos+1]) + if err != nil { + return nil, err + } l.matchPos += 2 + if !ok { + goto match + } + return candidate, nil } else if l.matchPos == len(l.lookaheadBuf) { - candidate, err = l.buildCandidate(ctx, l.leftKey, l.leftVal, l.rightKey, l.rightVal) - rightKeyNil = l.rightKey == nil + candidate, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, l.rightKey, l.rightVal) + if err != nil { + return nil, err + } l.matchPos++ + if !ok { + goto match + } + return candidate, nil } else { // reset l.matchPos = 0 + // compare the current to the next left key - nextLeftKey, nextLeftVal, err := l.leftIter.Next(ctx) + tmpKey, tmpVal := l.leftKey, l.leftVal + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) if err != nil { return nil, err } - cmp := l.cmp(nextLeftKey, nextLeftVal, l.leftKey, l.leftVal) - l.leftKey, l.leftVal = nextLeftKey, nextLeftVal - if cmp == 0 { - // simple case -- the left keys are equivalent, maintain - // the right-side lookahead buffer - goto match + cmp := l.cmp(tmpKey, tmpVal, l.leftKey, l.leftVal) + + if cmp != 0 { + // if the left key is new, invalidate lookahead buffer and + // advance the right side + l.lookaheadBuf = l.lookaheadBuf[:0] + if l.nextRightKey != nil { + l.rightKey, l.rightVal = l.nextRightKey, l.nextRightVal + l.nextRightKey = nil + } else { + l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) + if err != nil { + return nil, err + } + } } - // if the left key is new invalidate lookahead buffer and - // advance the right side - l.lookaheadBuf = l.lookaheadBuf[:0] - if l.nextRightKey != nil { - l.rightKey, l.rightVal = l.nextRightKey, l.nextRightVal - l.nextRightKey = nil - } else { - l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) + + if l.isLeftJoin && !l.matchedLeft { + // consider left join after appropriate state transitions + l.matchedLeft = false + ret, ok, err := l.tryReturn(ctx, tmpKey, tmpVal, nil, nil) if err != nil { return nil, err } + if ok { + return ret, nil + } } - goto incr + l.matchedLeft = false + if cmp == 0 { + // the left keys are equivalent, the right-side lookahead + // buffer is still valid for the new left key. the only reason + // we do not do this check earlier is for left-joins. + goto match + } + goto incr } +} + +func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey, rightVal val.Tuple) (sql.Row, bool, error) { + candidate, err := l.buildCandidate(ctx, leftKey, leftVal, rightKey, rightVal) if err != nil { - return nil, err + return nil, false, err + } + + rightKeyNil := l.rightKey == nil + + if l.leftFilter != nil { + res, err := sql.EvaluateCondition(ctx, l.leftFilter, candidate[:l.joiner.kvSplits[0]]) + if err != nil { + return nil, false, err + } + if !sql.IsTrue(res) { + return nil, false, nil + } + } + + if l.rightFilter != nil { + res, err := sql.EvaluateCondition(ctx, l.rightFilter, candidate[l.joiner.kvSplits[0]:]) + if err != nil { + return nil, false, err + } + if !sql.IsTrue(res) { + return nil, false, nil + } } // check filters for _, f := range l.joinFilters { res, err := sql.EvaluateCondition(ctx, f, candidate) if err != nil { - return nil, err + return nil, false, err } if res == nil && l.excludeNulls { // override default left join behavior - goto match + return nil, false, nil } else if !sql.IsTrue(res) && !rightKeyNil { - goto match + return nil, false, nil } } - return candidate, nil + + l.matchedLeft = true + return candidate, true, nil } func (l *mergeJoinKvIter) buildCandidate(ctx *sql.Context, leftKey, leftVal, rightKey, rightVal val.Tuple) (sql.Row, error) { var err error - if l.leftNorm != nil { + if l.leftNorm != nil && leftKey != nil { leftKey, leftVal, err = l.leftNorm(leftKey) if err != nil { return nil, err } } - if l.rightNorm != nil { + if l.rightNorm != nil && rightKey != nil { rightKey, rightVal, err = l.rightNorm(rightKey) if err != nil { return nil, err From adf0b5db067cc99ad9e697d154f8147e9aed688e Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Tue, 19 Nov 2024 13:13:51 -0800 Subject: [PATCH 06/15] join tests passing --- .../doltcore/sqle/index/index_reader.go | 15 +++- go/libraries/doltcore/sqle/kvexec/builder.go | 38 +++++--- .../doltcore/sqle/kvexec/merge_join.go | 86 ++++++++++++------- 3 files changed, 92 insertions(+), 47 deletions(-) diff --git a/go/libraries/doltcore/sqle/index/index_reader.go b/go/libraries/doltcore/sqle/index/index_reader.go index 3dd30f91ada..3b1bf1f432d 100644 --- a/go/libraries/doltcore/sqle/index/index_reader.go +++ b/go/libraries/doltcore/sqle/index/index_reader.go @@ -235,15 +235,15 @@ func (rp rangePartition) Key() []byte { return rp.key } -func MapForTableIndex(ctx *sql.Context, +func GetDurableIndex(ctx *sql.Context, tab DoltTableable, - idx DoltIndex) (prolly.Map, error) { + idx DoltIndex) (durable.Index, error) { di := idx.(*doltIndex) s, err := di.getDurableState(ctx, tab) if err != nil { - return prolly.Map{}, err + return nil, err } - return durable.ProllyMapFromIndex(s.Secondary), nil + return s.Secondary, nil } // IndexScanBuilder generates secondary lookups for partitions and @@ -658,6 +658,13 @@ func (ib *nonCoveringIndexImplBuilder) NewSecondaryIter(strict bool, cnt int, nu } } +func NewKeylessIndexImplBuilder(pri, sec durable.Index, idx DoltIndex) *keylessIndexImplBuilder { + return &keylessIndexImplBuilder{ + baseIndexImplBuilder: &baseIndexImplBuilder{idx: idx.(*doltIndex)}, + s: &durableIndexState{Primary: pri, Secondary: sec}, + } +} + // TODO keylessIndexImplBuilder should be similar to the non-covering // index case, where we will need to reference the primary index, // but can take advantage of point lookup optimizations diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index 0864ce9884d..7a91b7843b8 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -75,11 +75,11 @@ func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, er // - usually key tuple, but for keyless tables it's val tuple. // - use primary table projections as reference for comparison // filter indexes. - if cmp, ok := mergeComparer(filters[0], lSecSch, rSecSch, projections, leftMap.KeyDesc(), leftMap.ValDesc(), rightMap.KeyDesc(), rightMap.ValDesc()); ok { + if lrCmp, llCmp, ok := mergeComparer(filters[0], lSecSch, rSecSch, projections, leftMap.KeyDesc(), leftMap.ValDesc(), rightMap.KeyDesc(), rightMap.ValDesc()); ok { split := len(leftTags) var rowJoiner *prollyToSqlJoiner rowJoiner = newRowJoiner([]schema.Schema{lPriSch, rPriSch}, []int{split}, projections, leftMap.NodeStore()) - if iter, err := newMergeKvIter(leftIter, rightIter, rowJoiner, cmp, leftNorm, rightNorm, leftFilter, rightFilter, filters, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()); err == nil { + if iter, err := newMergeKvIter(leftIter, rightIter, rowJoiner, lrCmp, llCmp, leftNorm, rightNorm, leftFilter, rightFilter, filters, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()); err == nil { return iter, nil } } @@ -433,7 +433,7 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem // covering normalizer to primary key/val - var m prolly.Map + var secMap prolly.Map var table *doltdb.Table var tags []uint64 var iter prolly.MapIter @@ -441,7 +441,6 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem var priSch schema.Schema var idxSch schema.Schema var idx index.DoltIndex - var err error switch n := n.(type) { case *plan.TableAlias: @@ -483,7 +482,11 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem return prolly.Map{}, nil, nil, nil, nil, nil, nil, nil } - m, err = index.MapForTableIndex(ctx, doltTable, idx) + secIdx, err := index.GetDurableIndex(ctx, doltTable, idx) + if err != nil { + return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + } + secMap = durable.ProllyMapFromIndex(secIdx) table, err = doltTable.DoltTable(ctx) if err != nil { return prolly.Map{}, nil, nil, nil, nil, nil, nil, err @@ -491,7 +494,6 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem tags = doltTable.ProjectedTags() idxSch = idx.IndexSchema() priSch = idx.Schema() - covering = idx.ID() == "primary" || schemaIsCovering(idx.IndexSchema(), tags) l, err := n.GetLookup(ctx, nil) if err != nil { return prolly.Map{}, nil, nil, nil, nil, nil, nil, err @@ -502,14 +504,28 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem return prolly.Map{}, nil, nil, nil, nil, nil, nil, err } - iter, err = index.NewSequenceRangeIter(ctx, index.NewSecondaryIterGen(m), prollyRanges, l.IsReverse) + var secIterGen index.IndexRangeIterable + if schema.IsKeyless(idx.Schema()) { + idxSch = idx.Schema() + priIndex, err := table.GetRowData(ctx) + if err != nil { + return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + } + secMap = durable.ProllyMapFromIndex(priIndex) + secIterGen = index.NewKeylessIndexImplBuilder(priIndex, secIdx, idx) + } else { + secIterGen = index.NewSecondaryIterGen(secMap) + } + + iter, err = index.NewSequenceRangeIter(ctx, secIterGen, prollyRanges, l.IsReverse) if err != nil { return prolly.Map{}, nil, nil, nil, nil, nil, nil, err } + covering = idx.ID() == "primary" || schemaIsCovering(idxSch, tags) if covering { // projections satisfied by idxSch - return m, iter, idxSch, idxSch, tags, nil, nil, nil + return secMap, iter, idxSch, idxSch, tags, nil, nil, nil } priIndex, err := table.GetRowData(ctx) @@ -525,9 +541,9 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem var covNorm coveringNormalizer = func(key val.Tuple) (val.Tuple, val.Tuple, error) { for to := range pkMap { from := pkMap.MapOrdinal(to) - pkBld.PutRaw(to, m.KeyDesc().GetField(from, key)) + pkBld.PutRaw(to, secMap.KeyDesc().GetField(from, key)) } - pk := pkBld.Build(m.Pool()) + pk := pkBld.Build(secMap.Pool()) var v val.Tuple err = priMap.Get(ctx, pk, func(key val.Tuple, value val.Tuple) error { v = value @@ -538,7 +554,7 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem } return pk, v, nil } - return m, iter, priSch, idxSch, tags, nil, covNorm, nil + return secMap, iter, priSch, idxSch, tags, nil, covNorm, nil default: return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("unsupported kvmerge child node") } diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index dc1fe5ca62c..abae7b13de7 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -29,7 +29,7 @@ import ( func newMergeKvIter( leftIter, rightIter prolly.MapIter, joiner *prollyToSqlJoiner, - comparer func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int, + lrComparer, llComparer func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int, leftNorm, rightNorm coveringNormalizer, leftFilter, rightFilter sql.Expression, joinFilters []sql.Expression, @@ -74,7 +74,8 @@ func newMergeKvIter( leftIter: leftIter, rightIter: rightIter, joiner: joiner, - cmp: comparer, + lrCmp: lrComparer, + llCmp: llComparer, leftNorm: leftNorm, rightNorm: rightNorm, joinFilters: joinFilters, @@ -94,8 +95,8 @@ type mergeJoinKvIter struct { rightKey val.Tuple rightVal val.Tuple - // todo convert comparer to be []byte-amenable callback - cmp func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int + lrCmp func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int + llCmp func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int leftNorm coveringNormalizer rightNorm coveringNormalizer @@ -146,7 +147,7 @@ func (l *mergeJoinKvIter) Next(ctx *sql.Context) (sql.Row, error) { incr: // increment state - switch l.cmp(l.leftKey, l.leftVal, l.rightKey, l.rightVal) { + switch l.lrCmp(l.leftKey, l.leftVal, l.rightKey, l.rightVal) { case -1: var oldLeftKey, oldLeftVal val.Tuple { @@ -193,7 +194,7 @@ matchBuf: } return nil, err } - if l.cmp(l.leftKey, l.leftVal, l.nextRightKey, l.nextRightVal) == 0 { + if l.lrCmp(l.leftKey, l.leftVal, l.nextRightKey, l.nextRightVal) == 0 { l.lookaheadBuf = append(l.lookaheadBuf, l.nextRightKey, l.nextRightVal) goto matchBuf } @@ -231,7 +232,7 @@ match: if err != nil { return nil, err } - cmp := l.cmp(tmpKey, tmpVal, l.leftKey, l.leftVal) + cmp := l.llCmp(tmpKey, tmpVal, l.leftKey, l.leftVal) if cmp != 0 { // if the left key is new, invalidate lookahead buffer and @@ -341,7 +342,7 @@ func mergeComparer( leftSch, rightSch schema.Schema, projections []uint64, lKeyDesc, lValDesc, rKeyDesc, rValDesc val.TupleDesc, -) (ret func(leftKey, leftVal, rightKey, rightVal val.Tuple) int, ok bool) { +) (lrCmp, llCmp func(leftKey, leftVal, rightKey, rightVal val.Tuple) int, ok bool) { // first filter expression needs to be evaluated // can accept a subset of types -- (cmp GF GF) // need to map expression id to key or value position @@ -352,10 +353,10 @@ func mergeComparer( var err error cmp, err = equality.ToComparer() if err != nil { - return nil, false + return nil, nil, false } } else { - return nil, false + return nil, nil, false } } @@ -369,7 +370,7 @@ func mergeComparer( } if lIdx == rIdx { - return nil, false + return nil, nil, false } // |projections| and idx are in terms of output projections, @@ -380,41 +381,62 @@ func mergeComparer( rKeyIdx, rKeyOk := rightSch.GetPKCols().StoredIndexByTag(projections[rIdx]) rValIdx, rValOk := rightSch.GetNonPKCols().StoredIndexByTag(projections[rIdx]) + // first field in keyless value is cardinality + if schema.IsKeyless(leftSch) { + lValIdx++ + } + + if schema.IsKeyless(rightSch) { + rValIdx++ + } + var lTyp val.Type var rTyp val.Type - if lKeyOk && rKeyOk { + if lKeyOk { lTyp = lKeyDesc.Types[lKeyIdx] - rTyp = rKeyDesc.Types[rKeyIdx] - ret = func(leftKey, _, rightKey, _ val.Tuple) int { - return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightKey.GetField(rKeyIdx), lTyp) + llCmp = func(leftKey, _, rightKey, _ val.Tuple) int { + return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightKey.GetField(lKeyIdx), lTyp) } - } else if lKeyOk && rValOk { - lTyp = lKeyDesc.Types[lKeyIdx] - rTyp = rValDesc.Types[rValIdx] - ret = func(leftKey, _, _, rightVal val.Tuple) int { - return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightVal.GetField(rValIdx), lTyp) + if rKeyOk { + rTyp = rKeyDesc.Types[rKeyIdx] + lrCmp = func(leftKey, _, rightKey, _ val.Tuple) int { + return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightKey.GetField(rKeyIdx), lTyp) + } + } else if rValOk { + rTyp = rValDesc.Types[rValIdx] + lrCmp = func(leftKey, _, _, rightVal val.Tuple) int { + return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightVal.GetField(rValIdx), lTyp) + } + } else { + return nil, nil, false } - } else if lValOk && rKeyOk { + } else if lValOk { lTyp = lValDesc.Types[lValIdx] - rTyp = rKeyDesc.Types[rKeyIdx] - ret = func(_, leftVal, rightKey, _ val.Tuple) int { - return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightKey.GetField(rKeyIdx), lTyp) + llCmp = func(_, leftVal, _, rightVal val.Tuple) int { + return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightVal.GetField(lValIdx), lTyp) } - } else if lValOk && rValOk { - lTyp = lValDesc.Types[lValIdx] - rTyp = rValDesc.Types[rValIdx] - ret = func(_, leftVal, _, rightVal val.Tuple) int { - return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightVal.GetField(rValIdx), lTyp) + if rKeyOk { + rTyp = rKeyDesc.Types[rKeyIdx] + lrCmp = func(_, leftVal, rightKey, _ val.Tuple) int { + return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightKey.GetField(rKeyIdx), lTyp) + } + } else if rValOk { + rTyp = rValDesc.Types[rValIdx] + lrCmp = func(_, leftVal, _, rightVal val.Tuple) int { + return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightVal.GetField(rValIdx), lTyp) + } + } else { + return nil, nil, false } } else { - return nil, false + return nil, nil, false } if lTyp.Enc != rTyp.Enc { - return nil, false + return nil, nil, false } - return ret, true + return lrCmp, llCmp, true } func schemaIsCovering(sch schema.Schema, projections []uint64) bool { From bcb6718cd9a618ef797cdfc8253050ac0f30d19e Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 20 Nov 2024 10:14:42 -0800 Subject: [PATCH 07/15] skip virtual cols --- go/libraries/doltcore/sqle/kvexec/builder.go | 45 ++++++++++++-------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index 32d681319b3..6d289aad168 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -139,6 +139,7 @@ type prollyToSqlJoiner struct { kvSplits []int desc []kvDesc ordMappings []int + outCnt int } type kvDesc struct { @@ -170,6 +171,7 @@ func newRowJoiner(schemas []schema.Schema, splits []int, projections []uint64, n keyCols := sch.GetPKCols() valCols := sch.GetNonPKCols() splitIdx := 0 + virtualCnt := 0 for i := 0; i <= len(projections); i++ { // We will fill the map from table sources incrementally. Each source will have // a keyMapping, valueMapping, and ordinal mappings related to converting from @@ -181,20 +183,20 @@ func newRowJoiner(schemas []schema.Schema, splits []int, projections []uint64, n if i == splits[splitIdx] { var mappingStartIdx int if splitIdx > 0 { - mappingStartIdx = splits[splitIdx-1] + mappingStartIdx = splits[splitIdx-1] - virtualCnt } tupleDesc = append(tupleDesc, kvDesc{ keyDesc: sch.GetKeyDescriptor(), valDesc: sch.GetValueDescriptor(), - keyMappings: allMap[mappingStartIdx:nextKeyIdx], // prev kv partition -> last key of this partition - valMappings: allMap[nextKeyIdx:splits[splitIdx]], // first val of partition -> next kv partition + keyMappings: allMap[mappingStartIdx:nextKeyIdx], // prev kv partition -> last key of this partition + valMappings: allMap[nextKeyIdx : splits[splitIdx]-virtualCnt], // first val of partition -> next kv partition }) if i == len(projections) { break } - nextKeyIdx = splits[splitIdx] + nextKeyIdx = splits[splitIdx] - virtualCnt splitIdx++ - nextValIdx = splits[splitIdx] - 1 + nextValIdx = splits[splitIdx] - 1 - virtualCnt sch = schemas[splitIdx] keylessOff = 0 @@ -213,14 +215,22 @@ func newRowJoiner(schemas []schema.Schema, splits []int, projections []uint64, n allMap[nextValIdx] = idx + keylessOff allMap[numPhysicalColumns+nextValIdx] = i nextValIdx-- + } else { + virtualCnt++ } } + kvSplits := make([]int, len(splits)) + for i := range splits { + kvSplits[i] = splits[i] - virtualCnt + } + return &prollyToSqlJoiner{ - kvSplits: splits, + kvSplits: kvSplits, desc: tupleDesc, ordMappings: allMap[numPhysicalColumns:], ns: ns, + outCnt: len(projections), } } @@ -228,7 +238,7 @@ func (m *prollyToSqlJoiner) buildRow(ctx context.Context, tuples ...val.Tuple) ( if len(tuples) != 2*len(m.desc) { panic("invalid KV count for prollyToSqlJoiner") } - row := make(sql.Row, len(m.ordMappings)) + row := make(sql.Row, m.outCnt) split := 0 var err error var tup val.Tuple @@ -277,7 +287,7 @@ func getPhysicalColCount(schemas []schema.Schema, splits []int, projections []ui sch := schemas[0] splitIdx := 0 for i := 0; i < len(projections); i++ { - if i == splits[splitIdx] { + if splitIdx < len(splits) && i == splits[splitIdx] { splitIdx++ sch = schemas[splitIdx] } @@ -311,6 +321,10 @@ func getSourceKv(ctx *sql.Context, n sql.Node, isSrc bool) (prolly.Map, prolly.M } return m, secM, mIter, destIter, s, nil, t, n.Expression, nil case *plan.IndexedTableAccess: + if _, ok := plan.FindVirtualColumnTable(n.Table); ok { + return prolly.Map{}, prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("virtual tables unsupported in kvexec") + } + var lb index.IndexScanBuilder switch dt := n.UnderlyingTable().(type) { case *sqle.WritableIndexedDoltTable: @@ -425,15 +439,6 @@ func getSourceKv(ctx *sql.Context, n sql.Node, isSrc bool) (prolly.Map, prolly.M type coveringNormalizer func(val.Tuple) (val.Tuple, val.Tuple, error) func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schema.Schema, schema.Schema, []uint64, sql.Expression, coveringNormalizer, error) { - // merge kv is different from lookup KV: - // - embed the non-coverign lookup at the iter layer - // - the map and schema will depend on covering - - // one schema for merge comparison - // other schema for row joiner - - // covering normalizer to primary key/val - var secMap prolly.Map var table *doltdb.Table var tags []uint64 @@ -468,6 +473,12 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem } return m, mIter, destIter, s, t, expr, norm, nil case *plan.IndexedTableAccess: + if _, ok := plan.FindVirtualColumnTable(n.Table); ok { + // TODO pass projection through to iterator to materialize + // virtual cols + return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("virtual tables unsupported in kvexec") + } + var doltTable *sqle.DoltTable switch dt := n.UnderlyingTable().(type) { case *sqle.WritableIndexedDoltTable: From c1e9358978754bc95d88d16a17b2b285459aae07 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 20 Nov 2024 11:36:37 -0800 Subject: [PATCH 08/15] more left join bugs --- .../doltcore/sqle/kvexec/merge_join.go | 75 ++++++++++--------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index a8d3219ffb3..9c3b2f40070 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -36,40 +36,6 @@ func newMergeKvIter( isLeftJoin bool, excludeNulls bool, ) (*mergeJoinKvIter, error) { - // - //cmp, ok := filters[0].(expression.Comparer) - //if !ok { - // if equality, ok := filters[0].(expression.Equality); ok { - // var err error - // cmp, err = equality.ToComparer() - // if err != nil { - // return nil, nil - // } - // } else { - // return nil, nil - // } - //} - // - //if len(filters) == 0 { - // return nil, sql.ErrNoJoinFilters.New() - //} - // - //var lIdx, rIdx int - //if l, ok := cmp.Left().(*expression.GetField); ok { - // if r, ok := cmp.Right().(*expression.GetField); ok { - // // get indices of get fields - // lIdx = l.Index() - // rIdx = r.Index() - // } - //} - // - //if lIdx == rIdx { - // return nil, fmt.Errorf("unsupported merge comparison") - //} - //if lIdx != 0 || rIdx != joiner.kvSplits[0] { - // return nil, fmt.Errorf("unsupported merge comparison") - //} - return &mergeJoinKvIter{ leftIter: leftIter, rightIter: rightIter, @@ -120,6 +86,7 @@ type mergeJoinKvIter struct { excludeNulls bool isLeftJoin bool matchedLeft bool + exhaustLeft bool } var _ sql.RowIter = (*mergeJoinKvIter)(nil) @@ -137,10 +104,17 @@ func (l *mergeJoinKvIter) Next(ctx *sql.Context) (sql.Row, error) { } l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) if err != nil { + if errors.Is(err, io.EOF) && l.isLeftJoin { + goto exhaustLeft + } return nil, err } } + if l.exhaustLeft { + goto exhaustLeft + } + if len(l.lookaheadBuf) > 0 || l.matchPos > 0 { goto match } @@ -175,6 +149,9 @@ incr: } else { l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) if err != nil { + if errors.Is(err, io.EOF) && l.isLeftJoin { + goto exhaustLeft + } return nil, err } } @@ -244,14 +221,17 @@ match: } else { l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) if err != nil { - return nil, err + if errors.Is(err, io.EOF) && l.isLeftJoin { + l.exhaustLeft = true + } else { + return nil, err + } } } } if l.isLeftJoin && !l.matchedLeft { // consider left join after appropriate state transitions - l.matchedLeft = false ret, ok, err := l.tryReturn(ctx, tmpKey, tmpVal, nil, nil) if err != nil { return nil, err @@ -268,8 +248,29 @@ match: // we do not do this check earlier is for left-joins. goto match } + if l.exhaustLeft { + goto exhaustLeft + } goto incr } + +exhaustLeft: + l.exhaustLeft = true + if l.matchedLeft { + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return nil, err + } + } + l.matchedLeft = true // simplifies loop + ret, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, nil, nil) + if err != nil { + return nil, err + } + if ok { + return ret, nil + } + goto exhaustLeft } func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey, rightVal val.Tuple) (sql.Row, bool, error) { @@ -290,7 +291,7 @@ func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey } } - if l.rightFilter != nil { + if l.rightFilter != nil && !rightKeyNil { res, err := sql.EvaluateCondition(ctx, l.rightFilter, candidate[l.joiner.kvSplits[0]:]) if err != nil { return nil, false, err From 6ae632002b20ad227a41e52fca2a06bc98ca05da Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 20 Nov 2024 12:02:10 -0800 Subject: [PATCH 09/15] better documentation --- .../doltcore/sqle/kvexec/merge_join.go | 55 ++++++++++++++----- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index 9c3b2f40070..b3e0e12dc5d 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -64,6 +64,11 @@ type mergeJoinKvIter struct { lrCmp func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int llCmp func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int + // Non-covering secondary index reads are faster separated + // from the initial mapIter, because we avoid unnecessary + // primary reads. Note: keyless indexes are less amenable to + // this optimization because their cardinality is stored in the + // primary index. leftNorm coveringNormalizer rightNorm coveringNormalizer @@ -98,12 +103,7 @@ func (l *mergeJoinKvIter) Close(_ *sql.Context) error { func (l *mergeJoinKvIter) Next(ctx *sql.Context) (sql.Row, error) { var err error if l.leftKey == nil { - l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) - if err != nil { - return nil, err - } - l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) - if err != nil { + if err := l.initialize(ctx); err != nil { if errors.Is(err, io.EOF) && l.isLeftJoin { goto exhaustLeft } @@ -120,12 +120,15 @@ func (l *mergeJoinKvIter) Next(ctx *sql.Context) (sql.Row, error) { } incr: - // increment state + // compare the left/right keys. + // if equal continue to match buffer stage + // if unequal increment one side switch l.lrCmp(l.leftKey, l.leftVal, l.rightKey, l.rightVal) { case -1: + // left side has to consider left join non-matches var oldLeftKey, oldLeftVal val.Tuple { - // left join + // left join state if !l.matchedLeft && l.isLeftJoin { oldLeftKey, oldLeftVal = l.leftKey, l.leftVal } @@ -143,6 +146,7 @@ incr: case 0: goto matchBuf case +1: + // right side considers lookahead used for match buffer if l.nextRightKey != nil { l.rightKey, l.rightVal = l.nextRightKey, l.nextRightVal l.nextRightKey = nil @@ -162,7 +166,9 @@ incr: goto incr matchBuf: - // fill lookahead buffer with keys from right side + // Fill lookahead buffer with all right side keys that match current + // left key. |l.nextRightKey| can be a lookahead key or nil at the + // end of this stage. l.nextRightKey, l.nextRightVal, err = l.rightIter.Next(ctx) if err != nil { if errors.Is(err, io.EOF) { @@ -177,8 +183,10 @@ matchBuf: } match: - // match state - // lookaheadBuf and rightKey + // We start this stage with at least one match. We consider the + // lookahead buffer matches before |l.rightKey| because we can + // track state with a single |l.matchPos| variable. Matching merge + // condition does not guarantee the rest of the filters will match. if l.matchPos < len(l.lookaheadBuf) { candidate, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, l.lookaheadBuf[l.matchPos], l.lookaheadBuf[l.matchPos+1]) if err != nil { @@ -200,7 +208,9 @@ match: } return candidate, nil } else { - // reset + // We exhausted matches for the current |l.leftKey|. + // See whether we should flush or reuse the right buffer + // with the next left key. l.matchPos = 0 // compare the current to the next left key @@ -222,6 +232,8 @@ match: l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) if err != nil { if errors.Is(err, io.EOF) && l.isLeftJoin { + // need to check |tmpKey| for left-join before + // jumping to exhaustLeft l.exhaustLeft = true } else { return nil, err @@ -243,9 +255,9 @@ match: l.matchedLeft = false if cmp == 0 { - // the left keys are equivalent, the right-side lookahead - // buffer is still valid for the new left key. the only reason - // we do not do this check earlier is for left-joins. + // The left keys are equivalent, the right-side lookahead + // buffer is still valid for the new left key. The only reason + // we didn't short-circuit this check earlier is for left-joins. goto match } if l.exhaustLeft { @@ -336,6 +348,19 @@ func (l *mergeJoinKvIter) buildCandidate(ctx *sql.Context, leftKey, leftVal, rig return l.joiner.buildRow(ctx, leftKey, leftVal, rightKey, rightVal) } +func (l *mergeJoinKvIter) initialize(ctx *sql.Context) error { + var err error + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return err + } + l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) + if err != nil { + return err + } + return nil +} + var defCmp = val.DefaultTupleComparator{} func mergeComparer( From d9e893a7c1902887f6974d5a12846b194a196887 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 20 Nov 2024 13:22:28 -0800 Subject: [PATCH 10/15] more bugs --- .../doltcore/sqle/kvexec/merge_join.go | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index b3e0e12dc5d..e7b8ace626c 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -137,11 +137,27 @@ incr: l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) if err != nil { + if errors.Is(err, io.EOF) && oldLeftKey != nil { + l.exhaustLeft = true + candidate, ok, err := l.tryReturn(ctx, oldLeftKey, oldLeftVal, nil, nil) + if err != nil { + return nil, err + } + if ok { + return candidate, nil + } + } return nil, err } if oldLeftKey != nil { - return l.buildCandidate(ctx, oldLeftKey, oldLeftVal, nil, nil) + candidate, ok, err := l.tryReturn(ctx, oldLeftKey, oldLeftVal, nil, nil) + if err != nil { + return nil, err + } + if ok { + return candidate, nil + } } case 0: goto matchBuf @@ -193,6 +209,7 @@ match: return nil, err } l.matchPos += 2 + l.matchedLeft = ok if !ok { goto match } @@ -202,6 +219,7 @@ match: if err != nil { return nil, err } + l.matchedLeft = ok l.matchPos++ if !ok { goto match @@ -268,6 +286,9 @@ match: exhaustLeft: l.exhaustLeft = true + if l.leftKey == nil { + return nil, io.EOF + } if l.matchedLeft { l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) if err != nil { @@ -291,7 +312,7 @@ func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey return nil, false, err } - rightKeyNil := l.rightKey == nil + rightKeyNil := rightKey == nil if l.leftFilter != nil { res, err := sql.EvaluateCondition(ctx, l.leftFilter, candidate[:l.joiner.kvSplits[0]]) @@ -327,7 +348,6 @@ func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey } } - l.matchedLeft = true return candidate, true, nil } From b76555f8c8eb6937d852ce3fa16bbc5687091c9c Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Wed, 20 Nov 2024 14:58:53 -0800 Subject: [PATCH 11/15] more bugs --- go/libraries/doltcore/sqle/kvexec/builder.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index 6d289aad168..a37ffaf566d 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -45,6 +45,7 @@ func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, er // - parent row index shifts // - fusing kvexec operators // - compatible |val| encodings that we don't coerce + // - filter/project ordering clash switch n := n.(type) { case *plan.JoinNode: @@ -462,6 +463,11 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem if err != nil { return prolly.Map{}, nil, nil, nil, nil, nil, nil, err } + if expr != nil { + // TODO: cannot pre-project before the filter here, need to + // properly order the nodes + return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("kvmerge projection/filter clash") + } var newTags []uint64 for _, e := range n.Projections { switch e := e.(type) { @@ -471,7 +477,7 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("unsupported kvmerge projection") } } - return m, mIter, destIter, s, t, expr, norm, nil + return m, mIter, destIter, s, newTags, expr, norm, nil case *plan.IndexedTableAccess: if _, ok := plan.FindVirtualColumnTable(n.Table); ok { // TODO pass projection through to iterator to materialize From f67d302c77e15e741b035a7f877dfdc25357752a Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Thu, 21 Nov 2024 14:39:58 -0800 Subject: [PATCH 12/15] simplify arg passing --- go/libraries/doltcore/sqle/kvexec/builder.go | 117 ++++++++++-------- .../doltcore/sqle/kvexec/merge_join.go | 27 ++-- 2 files changed, 80 insertions(+), 64 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index a37ffaf566d..25d8c2ffe99 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -49,7 +49,10 @@ func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, er switch n := n.(type) { case *plan.JoinNode: - if n.Op.IsLookup() && !n.Op.IsPartial() { + switch { + case n.Op.IsPartial() || len(r) != 0: + return nil, nil + case n.Op.IsLookup(): if ita, ok := getIta(n.Right()); ok && len(r) == 0 && simpleLookupExpressions(ita.Expressions()) { if _, _, _, dstIter, _, _, dstTags, dstFilter, err := getSourceKv(ctx, n.Right(), false); err == nil && dstIter != nil { if srcMap, _, srcIter, _, srcSchema, _, srcTags, srcFilter, err := getSourceKv(ctx, n.Left(), true); err == nil && srcSchema != nil { @@ -67,21 +70,20 @@ func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, er } } } - } - if n.Op.IsMerge() && !n.Op.IsPartial() && len(r) == 0 { - if leftMap, leftIter, lPriSch, lSecSch, leftTags, leftFilter, leftNorm, err := getMergeKv(ctx, n.Left()); err == nil { - if rightMap, rightIter, rPriSch, rSecSch, rightTags, rightFilter, rightNorm, err := getMergeKv(ctx, n.Right()); err == nil { + case n.Op.IsMerge(): + if leftState, err := getMergeKv(ctx, n.Left()); err == nil { + if rightState, err := getMergeKv(ctx, n.Right()); err == nil { filters := expression.SplitConjunction(n.Filter) - projections := append(leftTags, rightTags...) + projections := append(leftState.tags, rightState.tags...) // - secondary indexes are source of comparison columns. // - usually key tuple, but for keyless tables it's val tuple. // - use primary table projections as reference for comparison // filter indexes. - if lrCmp, llCmp, ok := mergeComparer(filters[0], lSecSch, rSecSch, projections, leftMap.KeyDesc(), leftMap.ValDesc(), rightMap.KeyDesc(), rightMap.ValDesc()); ok { - split := len(leftTags) + if lrCmp, llCmp, ok := mergeComparer(filters[0], leftState, rightState, projections); ok { + split := len(leftState.tags) var rowJoiner *prollyToSqlJoiner - rowJoiner = newRowJoiner([]schema.Schema{lPriSch, rPriSch}, []int{split}, projections, leftMap.NodeStore()) - if iter, err := newMergeKvIter(leftIter, rightIter, rowJoiner, lrCmp, llCmp, leftNorm, rightNorm, leftFilter, rightFilter, filters, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()); err == nil { + rowJoiner = newRowJoiner([]schema.Schema{leftState.priSch, rightState.priSch}, []int{split}, projections, leftState.idxMap.NodeStore()) + if iter, err := newMergeKvIter(leftState.iter, rightState.iter, rowJoiner, lrCmp, llCmp, leftState.norm, rightState.norm, leftState.filter, rightState.filter, filters, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()); err == nil { return iter, nil } } @@ -439,50 +441,63 @@ func getSourceKv(ctx *sql.Context, n sql.Node, isSrc bool) (prolly.Map, prolly.M type coveringNormalizer func(val.Tuple) (val.Tuple, val.Tuple, error) -func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schema.Schema, schema.Schema, []uint64, sql.Expression, coveringNormalizer, error) { - var secMap prolly.Map +type mergeState struct { + idxMap prolly.Map + iter prolly.MapIter + priSch schema.Schema + idxSch schema.Schema + tags []uint64 + filter sql.Expression + norm coveringNormalizer +} + +func getMergeKv(ctx *sql.Context, n sql.Node) (mergeState, error) { + ms := mergeState{} + //var secMap prolly.Map var table *doltdb.Table - var tags []uint64 - var iter prolly.MapIter + //var tags []uint64 + //var iter prolly.MapIter var covering bool - var priSch schema.Schema - var idxSch schema.Schema + //var priSch schema.Schema + //var idxSch schema.Schema var idx index.DoltIndex switch n := n.(type) { case *plan.TableAlias: return getMergeKv(ctx, n.Child) case *plan.Filter: - m, mIter, destIter, s, t, _, norm, err := getMergeKv(ctx, n.Child) + ms, err := getMergeKv(ctx, n.Child) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } - return m, mIter, destIter, s, t, n.Expression, norm, nil + ms.filter = n.Expression + return ms, nil case *plan.Project: - m, mIter, destIter, s, t, expr, norm, err := getMergeKv(ctx, n.Child) + ms, err := getMergeKv(ctx, n.Child) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } - if expr != nil { + if ms.filter != nil { // TODO: cannot pre-project before the filter here, need to // properly order the nodes - return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("kvmerge projection/filter clash") + return ms, fmt.Errorf("kvmerge projection/filter clash") } var newTags []uint64 for _, e := range n.Projections { switch e := e.(type) { case *expression.GetField: - newTags = append(newTags, t[e.Index()]) + newTags = append(newTags, ms.tags[e.Index()]) default: - return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("unsupported kvmerge projection") + return ms, fmt.Errorf("unsupported kvmerge projection") } } - return m, mIter, destIter, s, newTags, expr, norm, nil + ms.tags = newTags + return ms, nil case *plan.IndexedTableAccess: if _, ok := plan.FindVirtualColumnTable(n.Table); ok { // TODO pass projection through to iterator to materialize // virtual cols - return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("virtual tables unsupported in kvexec") + return ms, fmt.Errorf("virtual tables unsupported in kvexec") } var doltTable *sqle.DoltTable @@ -497,58 +512,60 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem //case *dtables.DiffTable: // TODO: add interface to include system tables default: - return prolly.Map{}, nil, nil, nil, nil, nil, nil, nil + return ms, nil } secIdx, err := index.GetDurableIndex(ctx, doltTable, idx) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } - secMap = durable.ProllyMapFromIndex(secIdx) + ms.idxMap = durable.ProllyMapFromIndex(secIdx) table, err = doltTable.DoltTable(ctx) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } - tags = doltTable.ProjectedTags() - idxSch = idx.IndexSchema() - priSch = idx.Schema() + ms.tags = doltTable.ProjectedTags() + ms.idxSch = idx.IndexSchema() + ms.priSch = idx.Schema() l, err := n.GetLookup(ctx, nil) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } prollyRanges, err := index.ProllyRangesForIndex(ctx, l.Index, l.Ranges) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } var secIterGen index.IndexRangeIterable if schema.IsKeyless(idx.Schema()) { - idxSch = idx.Schema() + ms.idxSch = idx.Schema() priIndex, err := table.GetRowData(ctx) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } - secMap = durable.ProllyMapFromIndex(priIndex) + ms.idxMap = durable.ProllyMapFromIndex(priIndex) secIterGen = index.NewKeylessIndexImplBuilder(priIndex, secIdx, idx) } else { - secIterGen = index.NewSecondaryIterGen(secMap) + secIterGen = index.NewSecondaryIterGen(ms.idxMap) } - iter, err = index.NewSequenceRangeIter(ctx, secIterGen, prollyRanges, l.IsReverse) + ms.iter, err = index.NewSequenceRangeIter(ctx, secIterGen, prollyRanges, l.IsReverse) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } - covering = idx.ID() == "primary" || schemaIsCovering(idxSch, tags) + covering = idx.ID() == "primary" || schemaIsCovering(ms.idxSch, ms.tags) if covering { // projections satisfied by idxSch - return secMap, iter, idxSch, idxSch, tags, nil, nil, nil + ms.priSch = ms.idxSch + return ms, nil + //return secMap, iter, idxSch, idxSch, tags, nil, nil, nil } priIndex, err := table.GetRowData(ctx) if err != nil { - return prolly.Map{}, nil, nil, nil, nil, nil, nil, err + return ms, err } priMap := durable.ProllyMapFromIndex(priIndex) @@ -556,12 +573,12 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem priKd, _ := priMap.Descriptors() pkBld := val.NewTupleBuilder(priKd) - var covNorm coveringNormalizer = func(key val.Tuple) (val.Tuple, val.Tuple, error) { + ms.norm = func(key val.Tuple) (val.Tuple, val.Tuple, error) { for to := range pkMap { from := pkMap.MapOrdinal(to) - pkBld.PutRaw(to, secMap.KeyDesc().GetField(from, key)) + pkBld.PutRaw(to, ms.idxMap.KeyDesc().GetField(from, key)) } - pk := pkBld.Build(secMap.Pool()) + pk := pkBld.Build(ms.idxMap.Pool()) var v val.Tuple err = priMap.Get(ctx, pk, func(key val.Tuple, value val.Tuple) error { v = value @@ -572,8 +589,8 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (prolly.Map, prolly.MapIter, schem } return pk, v, nil } - return secMap, iter, priSch, idxSch, tags, nil, covNorm, nil + return ms, nil default: - return prolly.Map{}, nil, nil, nil, nil, nil, nil, fmt.Errorf("unsupported kvmerge child node") + return ms, fmt.Errorf("unsupported kvmerge child node") } } diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index e7b8ace626c..2e96f900f71 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -385,9 +385,8 @@ var defCmp = val.DefaultTupleComparator{} func mergeComparer( filter sql.Expression, - leftSch, rightSch schema.Schema, + lState, rState mergeState, projections []uint64, - lKeyDesc, lValDesc, rKeyDesc, rValDesc val.TupleDesc, ) (lrCmp, llCmp func(leftKey, leftVal, rightKey, rightVal val.Tuple) int, ok bool) { // first filter expression needs to be evaluated // can accept a subset of types -- (cmp GF GF) @@ -422,34 +421,34 @@ func mergeComparer( // |projections| and idx are in terms of output projections, // but we need tuple and position in terms of secondary index. // Use tags for the mapping. - lKeyIdx, lKeyOk := leftSch.GetPKCols().StoredIndexByTag(projections[lIdx]) - lValIdx, lValOk := leftSch.GetNonPKCols().StoredIndexByTag(projections[lIdx]) - rKeyIdx, rKeyOk := rightSch.GetPKCols().StoredIndexByTag(projections[rIdx]) - rValIdx, rValOk := rightSch.GetNonPKCols().StoredIndexByTag(projections[rIdx]) + lKeyIdx, lKeyOk := lState.idxSch.GetPKCols().StoredIndexByTag(projections[lIdx]) + lValIdx, lValOk := lState.idxSch.GetNonPKCols().StoredIndexByTag(projections[lIdx]) + rKeyIdx, rKeyOk := rState.idxSch.GetPKCols().StoredIndexByTag(projections[rIdx]) + rValIdx, rValOk := rState.idxSch.GetNonPKCols().StoredIndexByTag(projections[rIdx]) // first field in keyless value is cardinality - if schema.IsKeyless(leftSch) { + if schema.IsKeyless(lState.idxSch) { lValIdx++ } - if schema.IsKeyless(rightSch) { + if schema.IsKeyless(rState.idxSch) { rValIdx++ } var lTyp val.Type var rTyp val.Type if lKeyOk { - lTyp = lKeyDesc.Types[lKeyIdx] + lTyp = lState.idxMap.KeyDesc().Types[lKeyIdx] llCmp = func(leftKey, _, rightKey, _ val.Tuple) int { return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightKey.GetField(lKeyIdx), lTyp) } if rKeyOk { - rTyp = rKeyDesc.Types[rKeyIdx] + rTyp = rState.idxMap.KeyDesc().Types[rKeyIdx] lrCmp = func(leftKey, _, rightKey, _ val.Tuple) int { return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightKey.GetField(rKeyIdx), lTyp) } } else if rValOk { - rTyp = rValDesc.Types[rValIdx] + rTyp = rState.idxMap.ValDesc().Types[rValIdx] lrCmp = func(leftKey, _, _, rightVal val.Tuple) int { return defCmp.CompareValues(0, leftKey.GetField(lKeyIdx), rightVal.GetField(rValIdx), lTyp) } @@ -457,17 +456,17 @@ func mergeComparer( return nil, nil, false } } else if lValOk { - lTyp = lValDesc.Types[lValIdx] + lTyp = lState.idxMap.ValDesc().Types[lValIdx] llCmp = func(_, leftVal, _, rightVal val.Tuple) int { return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightVal.GetField(lValIdx), lTyp) } if rKeyOk { - rTyp = rKeyDesc.Types[rKeyIdx] + rTyp = rState.idxMap.KeyDesc().Types[rKeyIdx] lrCmp = func(_, leftVal, rightKey, _ val.Tuple) int { return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightKey.GetField(rKeyIdx), lTyp) } } else if rValOk { - rTyp = rValDesc.Types[rValIdx] + rTyp = rState.idxMap.ValDesc().Types[rValIdx] lrCmp = func(_, leftVal, _, rightVal val.Tuple) int { return defCmp.CompareValues(0, leftVal.GetField(lValIdx), rightVal.GetField(rValIdx), lTyp) } From 05c79da8f23a9f7f99da4de1ae21f11dec6ee797 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 22 Nov 2024 11:01:11 -0800 Subject: [PATCH 13/15] comments --- go/libraries/doltcore/sqle/kvexec/builder.go | 32 +- .../doltcore/sqle/kvexec/merge_join.go | 354 ++++++++++-------- 2 files changed, 216 insertions(+), 170 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index 25d8c2ffe99..c0de4c1b260 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -17,6 +17,7 @@ package kvexec import ( "context" "fmt" + "github.com/dolthub/dolt/go/store/types" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" @@ -83,7 +84,7 @@ func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, er split := len(leftState.tags) var rowJoiner *prollyToSqlJoiner rowJoiner = newRowJoiner([]schema.Schema{leftState.priSch, rightState.priSch}, []int{split}, projections, leftState.idxMap.NodeStore()) - if iter, err := newMergeKvIter(leftState.iter, rightState.iter, rowJoiner, lrCmp, llCmp, leftState.norm, rightState.norm, leftState.filter, rightState.filter, filters, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()); err == nil { + if iter, err := newMergeKvIter(leftState, rightState, rowJoiner, lrCmp, llCmp, filters, n.Op.IsLeftOuter(), n.Op.IsExcludeNulls()); err == nil { return iter, nil } } @@ -360,6 +361,9 @@ func getSourceKv(ctx *sql.Context, n sql.Node, isSrc bool) (prolly.Map, prolly.M if err != nil { return prolly.Map{}, prolly.Map{}, nil, nil, nil, nil, nil, nil, err } + if rowData.Format() != types.Format_DOLT { + return prolly.Map{}, prolly.Map{}, nil, nil, nil, nil, nil, nil, nil + } priMap = durable.ProllyMapFromIndex(rowData) priSch = lb.OutputSchema() @@ -439,27 +443,33 @@ func getSourceKv(ctx *sql.Context, n sql.Node, isSrc bool) (prolly.Map, prolly.M return priMap, secMap, srcIter, dstIter, priSch, nil, tags, nil, nil } +// coveringNormalizer inputs a secondary index key tuple and outputs a +// primary index key/value tuple. type coveringNormalizer func(val.Tuple) (val.Tuple, val.Tuple, error) type mergeState struct { + // secondary index being read idxMap prolly.Map - iter prolly.MapIter + // merge iterator + iter prolly.MapIter + // schemas for primary and secondary index. + // if the index is covering these are the same priSch schema.Schema idxSch schema.Schema - tags []uint64 + // output projection ordering + tags []uint64 + // filter for just this relation filter sql.Expression - norm coveringNormalizer + // norm is not nil when a non-covering index + // needs a callback into the primary index. + norm coveringNormalizer } func getMergeKv(ctx *sql.Context, n sql.Node) (mergeState, error) { ms := mergeState{} - //var secMap prolly.Map + var table *doltdb.Table - //var tags []uint64 - //var iter prolly.MapIter var covering bool - //var priSch schema.Schema - //var idxSch schema.Schema var idx index.DoltIndex switch n := n.(type) { @@ -515,6 +525,10 @@ func getMergeKv(ctx *sql.Context, n sql.Node) (mergeState, error) { return ms, nil } + if idx.Format() != types.Format_DOLT { + return ms, nil + } + secIdx, err := index.GetDurableIndex(ctx, doltTable, idx) if err != nil { return ms, err diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index 2e96f900f71..a62623686cd 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -27,26 +27,24 @@ import ( ) func newMergeKvIter( - leftIter, rightIter prolly.MapIter, + leftState, rightState mergeState, joiner *prollyToSqlJoiner, lrComparer, llComparer func(val.Tuple, val.Tuple, val.Tuple, val.Tuple) int, - leftNorm, rightNorm coveringNormalizer, - leftFilter, rightFilter sql.Expression, joinFilters []sql.Expression, isLeftJoin bool, excludeNulls bool, ) (*mergeJoinKvIter, error) { return &mergeJoinKvIter{ - leftIter: leftIter, - rightIter: rightIter, + leftIter: leftState.iter, + rightIter: rightState.iter, joiner: joiner, lrCmp: lrComparer, llCmp: llComparer, - leftNorm: leftNorm, - rightNorm: rightNorm, + leftNorm: leftState.norm, + rightNorm: rightState.norm, joinFilters: joinFilters, - leftFilter: leftFilter, - rightFilter: rightFilter, + leftFilter: leftState.filter, + rightFilter: rightState.filter, isLeftJoin: isLeftJoin, excludeNulls: excludeNulls, }, nil @@ -105,41 +103,54 @@ func (l *mergeJoinKvIter) Next(ctx *sql.Context) (sql.Row, error) { if l.leftKey == nil { if err := l.initialize(ctx); err != nil { if errors.Is(err, io.EOF) && l.isLeftJoin { - goto exhaustLeft + return l.exhaustLeftReturn(ctx) } return nil, err } } if l.exhaustLeft { - goto exhaustLeft + return l.exhaustLeftReturn(ctx) } if len(l.lookaheadBuf) > 0 || l.matchPos > 0 { goto match } -incr: +compare: // compare the left/right keys. // if equal continue to match buffer stage // if unequal increment one side - switch l.lrCmp(l.leftKey, l.leftVal, l.rightKey, l.rightVal) { - case -1: - // left side has to consider left join non-matches - var oldLeftKey, oldLeftVal val.Tuple - { - // left join state - if !l.matchedLeft && l.isLeftJoin { - oldLeftKey, oldLeftVal = l.leftKey, l.leftVal + for { + switch l.lrCmp(l.leftKey, l.leftVal, l.rightKey, l.rightVal) { + case -1: + // left side has to consider left join non-matches + var oldLeftKey, oldLeftVal val.Tuple + { + // left join state + if !l.matchedLeft && l.isLeftJoin { + oldLeftKey, oldLeftVal = l.leftKey, l.leftVal + } + l.matchedLeft = false } - l.matchedLeft = false - } - l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) - if err != nil { - if errors.Is(err, io.EOF) && oldLeftKey != nil { - l.exhaustLeft = true - candidate, ok, err := l.tryReturn(ctx, oldLeftKey, oldLeftVal, nil, nil) + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) && oldLeftKey != nil { + l.exhaustLeft = true + candidate, ok, err := l.buildResultRow(ctx, oldLeftKey, oldLeftVal, nil, nil) + if err != nil { + return nil, err + } + if ok { + return candidate, nil + } + } + return nil, err + } + + if oldLeftKey != nil { + candidate, ok, err := l.buildResultRow(ctx, oldLeftKey, oldLeftVal, nil, nil) if err != nil { return nil, err } @@ -147,102 +158,13 @@ incr: return candidate, nil } } - return nil, err - } - - if oldLeftKey != nil { - candidate, ok, err := l.tryReturn(ctx, oldLeftKey, oldLeftVal, nil, nil) - if err != nil { + case 0: + if err := l.fillMatchBuf(ctx); err != nil { return nil, err } - if ok { - return candidate, nil - } - } - case 0: - goto matchBuf - case +1: - // right side considers lookahead used for match buffer - if l.nextRightKey != nil { - l.rightKey, l.rightVal = l.nextRightKey, l.nextRightVal - l.nextRightKey = nil - } else { - l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) - if err != nil { - if errors.Is(err, io.EOF) && l.isLeftJoin { - goto exhaustLeft - } - return nil, err - } - } - } - if l.leftKey == nil || l.rightKey == nil { - return nil, io.EOF - } - goto incr - -matchBuf: - // Fill lookahead buffer with all right side keys that match current - // left key. |l.nextRightKey| can be a lookahead key or nil at the - // end of this stage. - l.nextRightKey, l.nextRightVal, err = l.rightIter.Next(ctx) - if err != nil { - if errors.Is(err, io.EOF) { - // this is OK, but need to skip nil key comparison goto match - } - return nil, err - } - if l.lrCmp(l.leftKey, l.leftVal, l.nextRightKey, l.nextRightVal) == 0 { - l.lookaheadBuf = append(l.lookaheadBuf, l.nextRightKey, l.nextRightVal) - goto matchBuf - } - -match: - // We start this stage with at least one match. We consider the - // lookahead buffer matches before |l.rightKey| because we can - // track state with a single |l.matchPos| variable. Matching merge - // condition does not guarantee the rest of the filters will match. - if l.matchPos < len(l.lookaheadBuf) { - candidate, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, l.lookaheadBuf[l.matchPos], l.lookaheadBuf[l.matchPos+1]) - if err != nil { - return nil, err - } - l.matchPos += 2 - l.matchedLeft = ok - if !ok { - goto match - } - return candidate, nil - } else if l.matchPos == len(l.lookaheadBuf) { - candidate, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, l.rightKey, l.rightVal) - if err != nil { - return nil, err - } - l.matchedLeft = ok - l.matchPos++ - if !ok { - goto match - } - return candidate, nil - } else { - // We exhausted matches for the current |l.leftKey|. - // See whether we should flush or reuse the right buffer - // with the next left key. - l.matchPos = 0 - - // compare the current to the next left key - tmpKey, tmpVal := l.leftKey, l.leftVal - l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) - if err != nil { - return nil, err - } - cmp := l.llCmp(tmpKey, tmpVal, l.leftKey, l.leftVal) - - if cmp != 0 { - // if the left key is new, invalidate lookahead buffer and - // advance the right side - l.lookaheadBuf = l.lookaheadBuf[:0] + case +1: + // right side considers lookahead used for match buffer if l.nextRightKey != nil { l.rightKey, l.rightVal = l.nextRightKey, l.nextRightVal l.nextRightKey = nil @@ -250,63 +172,108 @@ match: l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) if err != nil { if errors.Is(err, io.EOF) && l.isLeftJoin { - // need to check |tmpKey| for left-join before - // jumping to exhaustLeft - l.exhaustLeft = true - } else { - return nil, err + return l.exhaustLeftReturn(ctx) } + return nil, err } } } + if l.leftKey == nil || l.rightKey == nil { + return nil, io.EOF + } + } - if l.isLeftJoin && !l.matchedLeft { - // consider left join after appropriate state transitions - ret, ok, err := l.tryReturn(ctx, tmpKey, tmpVal, nil, nil) +match: + // We start this stage with at least one match. We consider the + // lookahead buffer matches before |l.rightKey| because we can + // track state with a single |l.matchPos| variable. Matching merge + // condition does not guarantee the rest of the filters will match. + for { + if l.matchPos < len(l.lookaheadBuf) { + candidate, ok, err := l.buildResultRow(ctx, l.leftKey, l.leftVal, l.lookaheadBuf[l.matchPos], l.lookaheadBuf[l.matchPos+1]) if err != nil { return nil, err } + l.matchPos += 2 + l.matchedLeft = ok if ok { - return ret, nil + return candidate, nil + } + } else if l.matchPos == len(l.lookaheadBuf) { + candidate, ok, err := l.buildResultRow(ctx, l.leftKey, l.leftVal, l.rightKey, l.rightVal) + if err != nil { + return nil, err + } + l.matchedLeft = ok + l.matchPos++ + if ok { + return candidate, nil + } + } else { + // We exhausted matches for the current |l.leftKey|. + // See whether we should flush or reuse the right buffer + // with the next left key. + l.matchPos = 0 + + // compare the current to the next left key + tmpKey, tmpVal := l.leftKey, l.leftVal + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return nil, err + } + cmp := l.llCmp(tmpKey, tmpVal, l.leftKey, l.leftVal) + + if cmp != 0 { + // if the left key is new, invalidate lookahead buffer and + // advance the right side + l.lookaheadBuf = l.lookaheadBuf[:0] + if l.nextRightKey != nil { + l.rightKey, l.rightVal = l.nextRightKey, l.nextRightVal + l.nextRightKey = nil + } else { + l.rightKey, l.rightVal, err = l.rightIter.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) && l.isLeftJoin { + // need to check |tmpKey| for left-join before + // jumping to exhaustLeft + l.exhaustLeft = true + } else { + return nil, err + } + } + } } - } - l.matchedLeft = false - if cmp == 0 { - // The left keys are equivalent, the right-side lookahead - // buffer is still valid for the new left key. The only reason - // we didn't short-circuit this check earlier is for left-joins. - goto match - } - if l.exhaustLeft { - goto exhaustLeft - } - goto incr - } + if l.isLeftJoin && !l.matchedLeft { + // consider left join after appropriate state transitions + ret, ok, err := l.buildResultRow(ctx, tmpKey, tmpVal, nil, nil) + if err != nil { + return nil, err + } + if ok { + return ret, nil + } + } + l.matchedLeft = false -exhaustLeft: - l.exhaustLeft = true - if l.leftKey == nil { - return nil, io.EOF - } - if l.matchedLeft { - l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) - if err != nil { - return nil, err + if cmp == 0 { + // The left keys are equivalent, the right-side lookahead + // buffer is still valid for the new left key. The only reason + // we didn't short-circuit this check earlier is for left-joins. + goto match + } + if l.exhaustLeft { + return l.exhaustLeftReturn(ctx) + } + goto compare } } - l.matchedLeft = true // simplifies loop - ret, ok, err := l.tryReturn(ctx, l.leftKey, l.leftVal, nil, nil) - if err != nil { - return nil, err - } - if ok { - return ret, nil - } - goto exhaustLeft } -func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey, rightVal val.Tuple) (sql.Row, bool, error) { +// buildResultRow combines a set of key/value tuples into an output row +// and checks it against filter expressions. Return the row, a boolean +// that indicates whether it passed filter checks, and an error. +func (l *mergeJoinKvIter) buildResultRow(ctx *sql.Context, leftKey, leftVal, rightKey, rightVal val.Tuple) (sql.Row, bool, error) { candidate, err := l.buildCandidate(ctx, leftKey, leftVal, rightKey, rightVal) if err != nil { return nil, false, err @@ -351,6 +318,71 @@ func (l *mergeJoinKvIter) tryReturn(ctx *sql.Context, leftKey, leftVal, rightKey return candidate, true, nil } +func (l *mergeJoinKvIter) exhaustLeftReturn(ctx *sql.Context) (sql.Row, error) { + l.exhaustLeft = true + if l.leftKey == nil { + return nil, io.EOF + } + var err error + for { + if l.matchedLeft { + l.leftKey, l.leftVal, err = l.leftIter.Next(ctx) + if err != nil { + return nil, err + } + } + l.matchedLeft = true // simplifies loop + ret, ok, err := l.buildResultRow(ctx, l.leftKey, l.leftVal, nil, nil) + if err != nil { + return nil, err + } + if ok { + return ret, nil + } + } +} + +/* +matchBuf: + // Fill lookahead buffer with all right side keys that match current + // left key. |l.nextRightKey| can be a lookahead key or nil at the + // end of this stage. + l.nextRightKey, l.nextRightVal, err = l.rightIter.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) { + // this is OK, but need to skip nil key comparison + goto match + } + return nil, err + } + if l.lrCmp(l.leftKey, l.leftVal, l.nextRightKey, l.nextRightVal) == 0 { + l.lookaheadBuf = append(l.lookaheadBuf, l.nextRightKey, l.nextRightVal) + goto matchBuf + } +*/ + +// Fill lookahead buffer with all right side keys that match current +// left key. |l.nextRightKey| can be a lookahead key or nil at the +// end of this stage. +func (l *mergeJoinKvIter) fillMatchBuf(ctx *sql.Context) error { + var err error + for { + l.nextRightKey, l.nextRightVal, err = l.rightIter.Next(ctx) + if err != nil { + if errors.Is(err, io.EOF) { + // this is OK, but need to skip nil key comparison + return nil + } + return err + } + if l.lrCmp(l.leftKey, l.leftVal, l.nextRightKey, l.nextRightVal) == 0 { + l.lookaheadBuf = append(l.lookaheadBuf, l.nextRightKey, l.nextRightVal) + } else { + return nil + } + } +} + func (l *mergeJoinKvIter) buildCandidate(ctx *sql.Context, leftKey, leftVal, rightKey, rightVal val.Tuple) (sql.Row, error) { var err error if l.leftNorm != nil && leftKey != nil { From 93cf89dfbd64d01aec1c9f448566ec91a3c51738 Mon Sep 17 00:00:00 2001 From: max-hoffman Date: Fri, 22 Nov 2024 19:11:25 +0000 Subject: [PATCH 14/15] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/kvexec/builder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index c0de4c1b260..909ed5c52f5 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -17,7 +17,6 @@ package kvexec import ( "context" "fmt" - "github.com/dolthub/dolt/go/store/types" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/expression" @@ -31,6 +30,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/sqle/index" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" + "github.com/dolthub/dolt/go/store/types" "github.com/dolthub/dolt/go/store/val" ) From 2c2748b862eacde6d1172ef15e3c79b8a2df56b2 Mon Sep 17 00:00:00 2001 From: Max Hoffman Date: Fri, 22 Nov 2024 11:45:21 -0800 Subject: [PATCH 15/15] more comments --- go/libraries/doltcore/sqle/kvexec/builder.go | 12 +++++---- .../doltcore/sqle/kvexec/merge_join.go | 25 +++---------------- 2 files changed, 10 insertions(+), 27 deletions(-) diff --git a/go/libraries/doltcore/sqle/kvexec/builder.go b/go/libraries/doltcore/sqle/kvexec/builder.go index c0de4c1b260..4336f619131 100644 --- a/go/libraries/doltcore/sqle/kvexec/builder.go +++ b/go/libraries/doltcore/sqle/kvexec/builder.go @@ -447,21 +447,23 @@ func getSourceKv(ctx *sql.Context, n sql.Node, isSrc bool) (prolly.Map, prolly.M // primary index key/value tuple. type coveringNormalizer func(val.Tuple) (val.Tuple, val.Tuple, error) +// mergeState aggregates the information needed to build one side of a +// merge join iterator. type mergeState struct { - // secondary index being read + // idxMap is the secondary index being read idxMap prolly.Map - // merge iterator + // iter is the index merge iterator iter prolly.MapIter // schemas for primary and secondary index. // if the index is covering these are the same priSch schema.Schema idxSch schema.Schema - // output projection ordering + // tags are the output projection/ordering tags []uint64 - // filter for just this relation + // filter is a relation-specific filter (usually nil) filter sql.Expression // norm is not nil when a non-covering index - // needs a callback into the primary index. + // needs a callback into the primary index norm coveringNormalizer } diff --git a/go/libraries/doltcore/sqle/kvexec/merge_join.go b/go/libraries/doltcore/sqle/kvexec/merge_join.go index a62623686cd..1cdfaa67408 100644 --- a/go/libraries/doltcore/sqle/kvexec/merge_join.go +++ b/go/libraries/doltcore/sqle/kvexec/merge_join.go @@ -178,9 +178,6 @@ compare: } } } - if l.leftKey == nil || l.rightKey == nil { - return nil, io.EOF - } } match: @@ -342,25 +339,6 @@ func (l *mergeJoinKvIter) exhaustLeftReturn(ctx *sql.Context) (sql.Row, error) { } } -/* -matchBuf: - // Fill lookahead buffer with all right side keys that match current - // left key. |l.nextRightKey| can be a lookahead key or nil at the - // end of this stage. - l.nextRightKey, l.nextRightVal, err = l.rightIter.Next(ctx) - if err != nil { - if errors.Is(err, io.EOF) { - // this is OK, but need to skip nil key comparison - goto match - } - return nil, err - } - if l.lrCmp(l.leftKey, l.leftVal, l.nextRightKey, l.nextRightVal) == 0 { - l.lookaheadBuf = append(l.lookaheadBuf, l.nextRightKey, l.nextRightVal) - goto matchBuf - } -*/ - // Fill lookahead buffer with all right side keys that match current // left key. |l.nextRightKey| can be a lookahead key or nil at the // end of this stage. @@ -516,6 +494,9 @@ func mergeComparer( return lrCmp, llCmp, true } +// schemaIsCovering returns true if all projection tags are found in the +// source schema. If any tag is not found in the schema, the primary index +// has to be access to complete the |projections| list. func schemaIsCovering(sch schema.Schema, projections []uint64) bool { cols := sch.GetAllCols() if len(projections) > cols.Size() {