diff --git a/.gitignore b/.gitignore index c3f95eb9..2d52f6b1 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ example/output/* # exception to the rule !example/output/.gitkeep +vendor/ +.vscode/ +.github/ diff --git a/reader/columnbuffer.go b/reader/columnbuffer.go index aae1de3e..60a8e20f 100644 --- a/reader/columnbuffer.go +++ b/reader/columnbuffer.go @@ -191,7 +191,7 @@ func (cbt *ColumnBufferType) ReadPageForSkip() (*layout.Page, error) { } } -func (cbt *ColumnBufferType) SkipRows(num int64) int64 { +func (cbt *ColumnBufferType) SkipRows(num int64) (int64, error) { var ( err error page *layout.Page @@ -199,6 +199,9 @@ func (cbt *ColumnBufferType) SkipRows(num int64) int64 { for cbt.DataTableNumRows < num && err == nil { page, err = cbt.ReadPageForSkip() + if err != nil { + return 0, err + } } if num > cbt.DataTableNumRows { @@ -207,7 +210,7 @@ func (cbt *ColumnBufferType) SkipRows(num int64) int64 { if page != nil { if err = page.GetValueFromRawData(cbt.SchemaHandler); err != nil { - return 0 + return 0, err } page.Decode(cbt.DictPage) @@ -226,18 +229,17 @@ func (cbt *ColumnBufferType) SkipRows(num int64) int64 { cbt.DataTable.Merge(tmp) } - return num + return num, nil } -func (cbt *ColumnBufferType) ReadRows(num int64) (*layout.Table, int64) { - if cbt.Footer.NumRows == 0 { - return &layout.Table{}, 0 - } - +func (cbt *ColumnBufferType) ReadRows(num int64) (*layout.Table, int64, error) { var err error for cbt.DataTableNumRows < num && err == nil { err = cbt.ReadPage() + if err != nil { + return nil, 0, err + } } if cbt.DataTableNumRows < 0 { @@ -252,11 +254,11 @@ func (cbt *ColumnBufferType) ReadRows(num int64) (*layout.Table, int64) { res := cbt.DataTable.Pop(num) cbt.DataTableNumRows -= num - if cbt.DataTableNumRows <= 0 { //release previous slice memory + if cbt.DataTableNumRows <= 0 { // release previous slice memory tmp := cbt.DataTable cbt.DataTable = layout.NewTableFromTable(tmp) cbt.DataTable.Merge(tmp) } - return res, num + return res, num, nil } diff --git a/reader/columnreader.go b/reader/columnreader.go index a27e986e..52c745d8 100644 --- a/reader/columnreader.go +++ b/reader/columnreader.go @@ -80,8 +80,8 @@ func (pr *ParquetReader) ReadColumnByPath(pathStr string, num int64) (values []i } if cb, ok := pr.ColumnBuffers[pathStr]; ok { - table, _ := cb.ReadRows(int64(num)) - return table.Values, table.RepetitionLevels, table.DefinitionLevels, nil + table, _, err := cb.ReadRows(int64(num)) + return table.Values, table.RepetitionLevels, table.DefinitionLevels, err } return []interface{}{}, []int32{}, []int32{}, errPathNotFound } diff --git a/reader/reader.go b/reader/reader.go index 0c9253b6..5bfdce4a 100644 --- a/reader/reader.go +++ b/reader/reader.go @@ -3,6 +3,8 @@ package reader import ( "context" "encoding/binary" + "errors" + "fmt" "io" "reflect" "strings" @@ -30,7 +32,7 @@ type ParquetReader struct { ObjPartialType reflect.Type } -//Create a parquet reader: obj is a object with schema tags or a JSON schema string +// Create a parquet reader: obj is a object with schema tags or a JSON schema string func NewParquetReader(pFile source.ParquetFile, obj interface{}, np int64) (*ParquetReader, error) { var err error res := new(ParquetReader) @@ -95,7 +97,7 @@ func (pr *ParquetReader) SetSchemaHandlerFromJSON(jsonSchema string) error { return nil } -//Rename schema name to inname +// Rename schema name to inname func (pr *ParquetReader) RenameSchema() { for i := 0; i < len(pr.SchemaHandler.Infos); i++ { pr.Footer.Schema[i].Name = pr.SchemaHandler.Infos[i].InName @@ -118,7 +120,7 @@ func (pr *ParquetReader) GetNumRows() int64 { return pr.Footer.GetNumRows() } -//Get the footer size +// Get the footer size func (pr *ParquetReader) GetFooterSize() (uint32, error) { var err error buf := make([]byte, 4) @@ -132,7 +134,7 @@ func (pr *ParquetReader) GetFooterSize() (uint32, error) { return size, err } -//Read footer from parquet file +// Read footer from parquet file func (pr *ParquetReader) ReadFooter() error { size, err := pr.GetFooterSize() if err != nil { @@ -143,13 +145,11 @@ func (pr *ParquetReader) ReadFooter() error { } pr.Footer = parquet.NewFileMetaData() pf := thrift.NewTCompactProtocolFactory() - thriftReader := thrift.NewStreamTransportR(pr.PFile) - bufferReader := thrift.NewTBufferedTransport(thriftReader, int(size)) - protocol := pf.GetProtocol(bufferReader) + protocol := pf.GetProtocol(thrift.NewStreamTransportR(pr.PFile)) return pr.Footer.Read(context.TODO(), protocol) } -//Skip rows of parquet file +// Skip rows of parquet file func (pr *ParquetReader) SkipRows(num int64) error { var err error if num <= 0 { @@ -195,7 +195,7 @@ func (pr *ParquetReader) SkipRows(num int64) error { return err } -//Read rows of parquet file and unmarshal all to dst +// Read rows of parquet file and unmarshal all to dst func (pr *ParquetReader) Read(dstInterface interface{}) error { return pr.read(dstInterface, "") } @@ -226,7 +226,7 @@ func (pr *ParquetReader) ReadByNumber(maxReadNumber int) ([]interface{}, error) return ret, nil } -//Read rows of parquet file and unmarshal all to dst +// Read rows of parquet file and unmarshal all to dst func (pr *ParquetReader) ReadPartial(dstInterface interface{}, prefixPath string) error { prefixPath, err := pr.SchemaHandler.ConvertToInPathStr(prefixPath) if err != nil { @@ -262,7 +262,7 @@ func (pr *ParquetReader) ReadPartialByNumber(maxReadNumber int, prefixPath strin return ret, nil } -//Read rows of parquet file with a prefixPath +// Read rows of parquet file with a prefixPath func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error { var err error tmap := make(map[string]*layout.Table) @@ -273,7 +273,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error return nil } - doneChan := make(chan int, pr.NP) + doneChan := make(chan error, pr.NP) taskChan := make(chan string, len(pr.ColumnBuffers)) stopChan := make(chan int) @@ -285,7 +285,11 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error return case pathStr := <-taskChan: cb := pr.ColumnBuffers[pathStr] - table, _ := cb.ReadRows(int64(num)) + table, _, err := cb.ReadRows(int64(num)) + if err != nil { + doneChan <- err + return + } locker.Lock() if _, ok := tmap[pathStr]; ok { tmap[pathStr].Merge(table) @@ -294,7 +298,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error tmap[pathStr].Merge(table) } locker.Unlock() - doneChan <- 0 + doneChan <- nil } } }() @@ -307,14 +311,19 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error readNum++ } } + errs := make([]error, 0) for i := 0; i < readNum; i++ { - <-doneChan + err := <-doneChan + if err != nil { + errs = append(errs, err) + } } - for i := int64(0); i < pr.NP; i++ { stopChan <- 0 } - + if err = errors.Join(errs...); err != nil { + return fmt.Errorf("error reading parquet file: %w", err) + } dstList := make([]interface{}, pr.NP) delta := (int64(num) + pr.NP - 1) / pr.NP @@ -352,7 +361,7 @@ func (pr *ParquetReader) read(dstInterface interface{}, prefixPath string) error return err } -//Stop Read +// Stop Read func (pr *ParquetReader) ReadStop() { for _, cb := range pr.ColumnBuffers { if cb != nil {