Skip to content

Commit

Permalink
add parquet.RowBuilder (xitongsys#457)
Browse files Browse the repository at this point in the history
* add parquet.RowBuilder

* fixes for parquet row generation

* add parquet.(*RowBuilder).Next

* move all merge code to merge.go (xitongsys#458)

* move all merge code to merge.go

* add example

* add benchmark for parquet.(*RowBuilder).Add

* initialize FIXED_LEN_BYTE_ARRAY columns to non-null zero-value

* make it OK to call Next if the repetition level is zero
  • Loading branch information
Achille authored Dec 14, 2022
1 parent efaee6e commit fe0b2f5
Show file tree
Hide file tree
Showing 7 changed files with 894 additions and 401 deletions.
67 changes: 67 additions & 0 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,71 @@ import (
"io"
)

// MergeRowGroups constructs a row group which is a merged view of rowGroups. If
// rowGroups are sorted and the passed options include sorting, the merged row
// group will also be sorted.
//
// The function validates the input to ensure that the merge operation is
// possible, ensuring that the schemas match or can be converted to an
// optionally configured target schema passed as argument in the option list.
//
// The sorting columns of each row group are also consulted to determine whether
// the output can be represented. If sorting columns are configured on the merge
// they must be a prefix of sorting columns of all row groups being merged.
func MergeRowGroups(rowGroups []RowGroup, options ...RowGroupOption) (RowGroup, error) {
config, err := NewRowGroupConfig(options...)
if err != nil {
return nil, err
}

schema := config.Schema
if len(rowGroups) == 0 {
return newEmptyRowGroup(schema), nil
}
if schema == nil {
schema = rowGroups[0].Schema()

for _, rowGroup := range rowGroups[1:] {
if !nodesAreEqual(schema, rowGroup.Schema()) {
return nil, ErrRowGroupSchemaMismatch
}
}
}

mergedRowGroups := make([]RowGroup, len(rowGroups))
copy(mergedRowGroups, rowGroups)

for i, rowGroup := range mergedRowGroups {
if rowGroupSchema := rowGroup.Schema(); !nodesAreEqual(schema, rowGroupSchema) {
conv, err := Convert(schema, rowGroupSchema)
if err != nil {
return nil, fmt.Errorf("cannot merge row groups: %w", err)
}
mergedRowGroups[i] = ConvertRowGroup(rowGroup, conv)
}
}

m := &mergedRowGroup{sorting: config.Sorting.SortingColumns}
m.init(schema, mergedRowGroups)

if len(m.sorting) == 0 {
// When the row group has no ordering, use a simpler version of the
// merger which simply concatenates rows from each of the row groups.
// This is preferable because it makes the output deterministic, the
// heap merge may otherwise reorder rows across groups.
return &m.multiRowGroup, nil
}

for _, rowGroup := range m.rowGroups {
if !sortingColumnsHavePrefix(rowGroup.SortingColumns(), m.sorting) {
return nil, ErrRowGroupSortingColumnsMismatch
}
}

m.compare = compareRowsFuncOf(schema, m.sorting)
return m, nil
}

type mergedRowGroup struct {
multiRowGroup
sorting []SortingColumn
Expand Down Expand Up @@ -83,6 +148,8 @@ func (r *mergedRowGroupRows) Schema() *Schema {
return r.schema
}

// MergeRowReader constructs a RowReader which creates an ordered sequence of
// all the readers using the given compare function as the ordering predicate.
func MergeRowReaders(readers []RowReader, compare func(Row, Row) int) RowReader {
return &mergedRowReader{
compare: compare,
Expand Down
Loading

0 comments on commit fe0b2f5

Please sign in to comment.