Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kvexec] merge join #8561

Merged
merged 19 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 50 additions & 13 deletions go/libraries/doltcore/sqle/index/index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,25 @@ func (rp rangePartition) Key() []byte {
return rp.key
}

func GetDurableIndex(ctx *sql.Context,
tab DoltTableable,
idx DoltIndex) (durable.Index, error) {
di := idx.(*doltIndex)
s, err := di.getDurableState(ctx, tab)
if err != nil {
return nil, err
}
return s.Secondary, nil
}

// IndexScanBuilder generates secondary lookups for partitions and
// encapsulates fast path optimizations for certain point lookups.
type IndexScanBuilder interface {
IndexRangeIterable

// NewPartitionRowIter returns a sql.RowIter for an index partition.
NewPartitionRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error)

// NewRangeMapIter returns a prolly.MapIter for an index partition.
NewRangeMapIter(ctx context.Context, r prolly.Range, reverse bool) (prolly.MapIter, error)

// NewSecondaryIter returns an object used to perform secondary lookups
// for index joins.
NewSecondaryIter(strict bool, cnt int, nullSafe []bool) SecondaryLookupIterGen
Expand All @@ -255,6 +265,11 @@ type IndexScanBuilder interface {
OutputSchema() schema.Schema
}

type IndexRangeIterable interface {
// NewRangeMapIter returns a prolly.MapIter for an index partition.
NewRangeMapIter(ctx context.Context, r prolly.Range, reverse bool) (prolly.MapIter, error)
}

func NewIndexReaderBuilder(
ctx *sql.Context,
tab DoltTableable,
Expand Down Expand Up @@ -339,12 +354,11 @@ func newNonCoveringLookupBuilder(s *durableIndexState, b *baseIndexImplBuilder)
primary := durable.ProllyMapFromIndex(s.Primary)
priKd, _ := primary.Descriptors()
tbBld := val.NewTupleBuilder(priKd)
pkMap := ordinalMappingFromIndex(b.idx)
pkMap := OrdinalMappingFromIndex(b.idx)
keyProj, valProj, ordProj := projectionMappings(b.idx.Schema(), b.projections)
return &nonCoveringIndexImplBuilder{
baseIndexImplBuilder: b,
pri: primary,
priKd: priKd,
pkBld: tbBld,
pkMap: pkMap,
keyMap: keyProj,
Expand Down Expand Up @@ -439,18 +453,18 @@ type coveringIndexImplBuilder struct {
keyMap, valMap, ordMap val.OrdinalMapping
}

func NewSequenceRangeIter(ctx context.Context, ib IndexScanBuilder, ranges []prolly.Range, reverse bool) (prolly.MapIter, error) {
func NewSequenceRangeIter(ctx context.Context, irIter IndexRangeIterable, ranges []prolly.Range, reverse bool) (prolly.MapIter, error) {
if len(ranges) == 0 {
return &strictLookupIter{}, nil
}
// TODO: probably need to do something with Doltgres ranges here?
cur, err := ib.NewRangeMapIter(ctx, ranges[0], reverse)
cur, err := irIter.NewRangeMapIter(ctx, ranges[0], reverse)
if err != nil || len(ranges) < 2 {
return cur, err
}
return &sequenceRangeIter{
cur: cur,
ib: ib,
irIter: irIter,
reverse: reverse,
remainingRanges: ranges[1:],
}, nil
Expand All @@ -461,7 +475,7 @@ func NewSequenceRangeIter(ctx context.Context, ib IndexScanBuilder, ranges []pro
// an underlying map.
type sequenceRangeIter struct {
cur prolly.MapIter
ib IndexScanBuilder
irIter IndexRangeIterable
reverse bool
remainingRanges []prolly.Range
}
Expand All @@ -475,7 +489,7 @@ func (i *sequenceRangeIter) Next(ctx context.Context) (val.Tuple, val.Tuple, err
if len(i.remainingRanges) == 0 {
return nil, nil, io.EOF
}
i.cur, err = i.ib.NewRangeMapIter(ctx, i.remainingRanges[0], i.reverse)
i.cur, err = i.irIter.NewRangeMapIter(ctx, i.remainingRanges[0], i.reverse)
if err != nil {
return nil, nil, err
}
Expand All @@ -487,6 +501,23 @@ func (i *sequenceRangeIter) Next(ctx context.Context) (val.Tuple, val.Tuple, err
return k, v, nil
}

type secIterGen struct {
m prolly.Map
}

func NewSecondaryIterGen(m prolly.Map) IndexRangeIterable {
return secIterGen{m: m}
}

// NewRangeMapIter implements IndexScanBuilder
func (si secIterGen) NewRangeMapIter(ctx context.Context, r prolly.Range, reverse bool) (prolly.MapIter, error) {
if reverse {
return si.m.IterRangeReverse(ctx, r)
} else {
return si.m.IterRange(ctx, r)
}
}

func (ib *coveringIndexImplBuilder) OutputSchema() schema.Schema {
return ib.idx.IndexSchema()
}
Expand Down Expand Up @@ -536,7 +567,6 @@ type nonCoveringIndexImplBuilder struct {
*baseIndexImplBuilder

pri prolly.Map
priKd val.TupleDesc
pkBld *val.TupleBuilder

pkMap, keyMap, valMap, ordMap val.OrdinalMapping
Expand Down Expand Up @@ -628,6 +658,13 @@ func (ib *nonCoveringIndexImplBuilder) NewSecondaryIter(strict bool, cnt int, nu
}
}

func NewKeylessIndexImplBuilder(pri, sec durable.Index, idx DoltIndex) *keylessIndexImplBuilder {
return &keylessIndexImplBuilder{
baseIndexImplBuilder: &baseIndexImplBuilder{idx: idx.(*doltIndex)},
s: &durableIndexState{Primary: pri, Secondary: sec},
}
}

// TODO keylessIndexImplBuilder should be similar to the non-covering
// index case, where we will need to reference the primary index,
// but can take advantage of point lookup optimizations
Expand All @@ -651,7 +688,7 @@ func (ib *keylessIndexImplBuilder) NewRangeMapIter(ctx context.Context, r prolly
}
clustered := durable.ProllyMapFromIndex(rows)
keyDesc := clustered.KeyDesc()
indexMap := ordinalMappingFromIndex(ib.idx)
indexMap := OrdinalMappingFromIndex(ib.idx)

keyBld := val.NewTupleBuilder(keyDesc)

Expand Down Expand Up @@ -721,7 +758,7 @@ func (ib *keylessIndexImplBuilder) NewSecondaryIter(strict bool, cnt int, nullSa
pri: pri,
sec: secondary,
sch: ib.idx.tableSch,
pkMap: ordinalMappingFromIndex(ib.idx),
pkMap: OrdinalMappingFromIndex(ib.idx),
pkBld: pkBld,
prefixDesc: secondary.KeyDesc().PrefixDesc(cnt),
}
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/sqle/index/prolly_index_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func newProllyIndexIter(
primary := durable.ProllyMapFromIndex(dprimary)
kd, _ := primary.Descriptors()
pkBld := val.NewTupleBuilder(kd)
pkMap := ordinalMappingFromIndex(idx)
pkMap := OrdinalMappingFromIndex(idx)
keyProj, valProj, ordProj := projectionMappings(idx.Schema(), projections)

iter := prollyIndexIter{
Expand Down Expand Up @@ -135,7 +135,7 @@ func (p prollyIndexIter) Close(*sql.Context) error {
return nil
}

func ordinalMappingFromIndex(idx DoltIndex) (m val.OrdinalMapping) {
func OrdinalMappingFromIndex(idx DoltIndex) (m val.OrdinalMapping) {
def := idx.Schema().Indexes().GetByName(idx.ID())
pks := def.PrimaryKeyTags()
if len(pks) == 0 { // keyless index
Expand Down Expand Up @@ -314,7 +314,7 @@ func newProllyKeylessIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range

clustered := durable.ProllyMapFromIndex(rows)
keyDesc, valDesc := clustered.Descriptors()
indexMap := ordinalMappingFromIndex(idx)
indexMap := OrdinalMappingFromIndex(idx)
keyBld := val.NewTupleBuilder(keyDesc)
sch := idx.Schema()
_, vm, om := projectionMappings(sch, projections)
Expand Down
Loading
Loading